<!doctype html>
<html style='font-size:24px !important'>
<head>
<meta charset='UTF-8'><meta name='viewport' content='width=device-width initial-scale=1'>

<link href='https://fonts.loli.net/css?family=Open+Sans:400italic,700italic,700,400&subset=latin,latin-ext' rel='stylesheet' type='text/css' /><style type='text/css'>html {overflow-x: initial !important;}:root { --bg-color:#ffffff; --text-color:#333333; --select-text-bg-color:#B5D6FC; --select-text-font-color:auto; --monospace:"Lucida Console",Consolas,"Courier",monospace; --title-bar-height:20px; }
.mac-os-11 { --title-bar-height:28px; }
html { font-size: 14px; background-color: var(--bg-color); color: var(--text-color); font-family: "Helvetica Neue", Helvetica, Arial, sans-serif; -webkit-font-smoothing: antialiased; }
body { margin: 0px; padding: 0px; height: auto; inset: 0px; font-size: 1rem; line-height: 1.42857; overflow-x: hidden; background: inherit; tab-size: 4; }
iframe { margin: auto; }
a.url { word-break: break-all; }
a:active, a:hover { outline: 0px; }
.in-text-selection, ::selection { text-shadow: none; background: var(--select-text-bg-color); color: var(--select-text-font-color); }
#write { margin: 0px auto; height: auto; width: inherit; word-break: normal; overflow-wrap: break-word; position: relative; white-space: normal; overflow-x: visible; padding-top: 36px; }
#write.first-line-indent p { text-indent: 2em; }
#write.first-line-indent li p, #write.first-line-indent p * { text-indent: 0px; }
#write.first-line-indent li { margin-left: 2em; }
.for-image #write { padding-left: 8px; padding-right: 8px; }
body.typora-export { padding-left: 30px; padding-right: 30px; }
.typora-export .footnote-line, .typora-export li, .typora-export p { white-space: pre-wrap; }
.typora-export .task-list-item input { pointer-events: none; }
@media screen and (max-width: 500px) {
  body.typora-export { padding-left: 0px; padding-right: 0px; }
  #write { padding-left: 20px; padding-right: 20px; }
  .CodeMirror-sizer { margin-left: 0px !important; }
  .CodeMirror-gutters { display: none !important; }
}
#write li > figure:last-child { margin-bottom: 0.5rem; }
#write ol, #write ul { position: relative; }
img { max-width: 100%; vertical-align: middle; image-orientation: from-image; }
button, input, select, textarea { color: inherit; font: inherit; }
input[type="checkbox"], input[type="radio"] { line-height: normal; padding: 0px; }
*, ::after, ::before { box-sizing: border-box; }
#write h1, #write h2, #write h3, #write h4, #write h5, #write h6, #write p, #write pre { width: inherit; }
#write h1, #write h2, #write h3, #write h4, #write h5, #write h6, #write p { position: relative; }
p { line-height: inherit; }
h1, h2, h3, h4, h5, h6 { break-after: avoid-page; break-inside: avoid; orphans: 4; }
p { orphans: 4; }
h1 { font-size: 2rem; }
h2 { font-size: 1.8rem; }
h3 { font-size: 1.6rem; }
h4 { font-size: 1.4rem; }
h5 { font-size: 1.2rem; }
h6 { font-size: 1rem; }
.md-math-block, .md-rawblock, h1, h2, h3, h4, h5, h6, p { margin-top: 1rem; margin-bottom: 1rem; }
.hidden { display: none; }
.md-blockmeta { color: rgb(204, 204, 204); font-weight: 700; font-style: italic; }
a { cursor: pointer; }
sup.md-footnote { padding: 2px 4px; background-color: rgba(238, 238, 238, 0.7); color: rgb(85, 85, 85); border-radius: 4px; cursor: pointer; }
sup.md-footnote a, sup.md-footnote a:hover { color: inherit; text-transform: inherit; text-decoration: inherit; }
#write input[type="checkbox"] { cursor: pointer; width: inherit; height: inherit; }
figure { overflow-x: auto; margin: 1.2em 0px; max-width: calc(100% + 16px); padding: 0px; }
figure > table { margin: 0px; }
tr { break-inside: avoid; break-after: auto; }
thead { display: table-header-group; }
table { border-collapse: collapse; border-spacing: 0px; width: 100%; overflow: auto; break-inside: auto; text-align: left; }
table.md-table td { min-width: 32px; }
.CodeMirror-gutters { border-right: 0px; background-color: inherit; }
.CodeMirror-linenumber { user-select: none; }
.CodeMirror { text-align: left; }
.CodeMirror-placeholder { opacity: 0.3; }
.CodeMirror pre { padding: 0px 4px; }
.CodeMirror-lines { padding: 0px; }
div.hr:focus { cursor: none; }
#write pre { white-space: pre-wrap; }
#write.fences-no-line-wrapping pre { white-space: pre; }
#write pre.ty-contain-cm { white-space: normal; }
.CodeMirror-gutters { margin-right: 4px; }
.md-fences { font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; overflow: visible; white-space: pre; background: inherit; position: relative !important; }
.md-fences-adv-panel { width: 100%; margin-top: 10px; text-align: center; padding-top: 0px; padding-bottom: 8px; overflow-x: auto; }
#write .md-fences.mock-cm { white-space: pre-wrap; }
.md-fences.md-fences-with-lineno { padding-left: 0px; }
#write.fences-no-line-wrapping .md-fences.mock-cm { white-space: pre; overflow-x: auto; }
.md-fences.mock-cm.md-fences-with-lineno { padding-left: 8px; }
.CodeMirror-line, twitterwidget { break-inside: avoid; }
.footnotes { opacity: 0.8; font-size: 0.9rem; margin-top: 1em; margin-bottom: 1em; }
.footnotes + .footnotes { margin-top: 0px; }
.md-reset { margin: 0px; padding: 0px; border: 0px; outline: 0px; vertical-align: top; background: 0px 0px; text-decoration: none; text-shadow: none; float: none; position: static; width: auto; height: auto; white-space: nowrap; cursor: inherit; -webkit-tap-highlight-color: transparent; line-height: normal; font-weight: 400; text-align: left; box-sizing: content-box; direction: ltr; }
li div { padding-top: 0px; }
blockquote { margin: 1rem 0px; }
li .mathjax-block, li p { margin: 0.5rem 0px; }
li blockquote { margin: 1rem 0px; }
li { margin: 0px; position: relative; }
blockquote > :last-child { margin-bottom: 0px; }
blockquote > :first-child, li > :first-child { margin-top: 0px; }
.footnotes-area { color: rgb(136, 136, 136); margin-top: 0.714rem; padding-bottom: 0.143rem; white-space: normal; }
#write .footnote-line { white-space: pre-wrap; }
@media print {
  body, html { border: 1px solid transparent; height: 99%; break-after: avoid; break-before: avoid; font-variant-ligatures: no-common-ligatures; }
  #write { margin-top: 0px; padding-top: 0px; border-color: transparent !important; }
  .typora-export * { -webkit-print-color-adjust: exact; }
  .typora-export #write { break-after: avoid; }
  .typora-export #write::after { height: 0px; }
  .is-mac table { break-inside: avoid; }
  .typora-export-show-outline .typora-export-sidebar { display: none; }
}
.footnote-line { margin-top: 0.714em; font-size: 0.7em; }
a img, img a { cursor: pointer; }
pre.md-meta-block { font-size: 0.8rem; min-height: 0.8rem; white-space: pre-wrap; background: rgb(204, 204, 204); display: block; overflow-x: hidden; }
p > .md-image:only-child:not(.md-img-error) img, p > img:only-child { display: block; margin: auto; }
#write.first-line-indent p > .md-image:only-child:not(.md-img-error) img { left: -2em; position: relative; }
p > .md-image:only-child { display: inline-block; width: 100%; }
#write .MathJax_Display { margin: 0.8em 0px 0px; }
.md-math-block { width: 100%; }
.md-math-block:not(:empty)::after { display: none; }
.MathJax_ref { fill: currentcolor; }
[contenteditable="true"]:active, [contenteditable="true"]:focus, [contenteditable="false"]:active, [contenteditable="false"]:focus { outline: 0px; box-shadow: none; }
.md-task-list-item { position: relative; list-style-type: none; }
.task-list-item.md-task-list-item { padding-left: 0px; }
.md-task-list-item > input { position: absolute; top: 0px; left: 0px; margin-left: -1.2em; margin-top: calc(1em - 10px); border: none; }
.math { font-size: 1rem; }
.md-toc { min-height: 3.58rem; position: relative; font-size: 0.9rem; border-radius: 10px; }
.md-toc-content { position: relative; margin-left: 0px; }
.md-toc-content::after, .md-toc::after { display: none; }
.md-toc-item { display: block; color: rgb(65, 131, 196); }
.md-toc-item a { text-decoration: none; }
.md-toc-inner:hover { text-decoration: underline; }
.md-toc-inner { display: inline-block; cursor: pointer; }
.md-toc-h1 .md-toc-inner { margin-left: 0px; font-weight: 700; }
.md-toc-h2 .md-toc-inner { margin-left: 2em; }
.md-toc-h3 .md-toc-inner { margin-left: 4em; }
.md-toc-h4 .md-toc-inner { margin-left: 6em; }
.md-toc-h5 .md-toc-inner { margin-left: 8em; }
.md-toc-h6 .md-toc-inner { margin-left: 10em; }
@media screen and (max-width: 48em) {
  .md-toc-h3 .md-toc-inner { margin-left: 3.5em; }
  .md-toc-h4 .md-toc-inner { margin-left: 5em; }
  .md-toc-h5 .md-toc-inner { margin-left: 6.5em; }
  .md-toc-h6 .md-toc-inner { margin-left: 8em; }
}
a.md-toc-inner { font-size: inherit; font-style: inherit; font-weight: inherit; line-height: inherit; }
.footnote-line a:not(.reversefootnote) { color: inherit; }
.md-attr { display: none; }
.md-fn-count::after { content: "."; }
code, pre, samp, tt { font-family: var(--monospace); }
kbd { margin: 0px 0.1em; padding: 0.1em 0.6em; font-size: 0.8em; color: rgb(36, 39, 41); background: rgb(255, 255, 255); border: 1px solid rgb(173, 179, 185); border-radius: 3px; box-shadow: rgba(12, 13, 14, 0.2) 0px 1px 0px, rgb(255, 255, 255) 0px 0px 0px 2px inset; white-space: nowrap; vertical-align: middle; }
.md-comment { color: rgb(162, 127, 3); opacity: 0.6; font-family: var(--monospace); }
code { text-align: left; vertical-align: initial; }
a.md-print-anchor { white-space: pre !important; border-width: initial !important; border-style: none !important; border-color: initial !important; display: inline-block !important; position: absolute !important; width: 1px !important; right: 0px !important; outline: 0px !important; background: 0px 0px !important; text-decoration: initial !important; text-shadow: initial !important; }
.os-windows.monocolor-emoji .md-emoji { font-family: "Segoe UI Symbol", sans-serif; }
.md-diagram-panel > svg { max-width: 100%; }
[lang="flow"] svg, [lang="mermaid"] svg { max-width: 100%; height: auto; }
[lang="mermaid"] .node text { font-size: 1rem; }
table tr th { border-bottom: 0px; }
video { max-width: 100%; display: block; margin: 0px auto; }
iframe { max-width: 100%; width: 100%; border: none; }
.highlight td, .highlight tr { border: 0px; }
mark { background: rgb(255, 255, 0); color: rgb(0, 0, 0); }
.md-html-inline .md-plain, .md-html-inline strong, mark .md-inline-math, mark strong { color: inherit; }
.md-expand mark .md-meta { opacity: 0.3 !important; }
mark .md-meta { color: rgb(0, 0, 0); }
@media print {
  .typora-export h1, .typora-export h2, .typora-export h3, .typora-export h4, .typora-export h5, .typora-export h6 { break-inside: avoid; }
}
.md-diagram-panel .messageText { stroke: none !important; }
.md-diagram-panel .start-state { fill: var(--node-fill); }
.md-diagram-panel .edgeLabel rect { opacity: 1 !important; }
.md-fences.md-fences-math { font-size: 1em; }
.md-fences-advanced:not(.md-focus) { padding: 0px; white-space: nowrap; border: 0px; }
.md-fences-advanced:not(.md-focus) { background: inherit; }
.typora-export-show-outline .typora-export-content { max-width: 1440px; margin: auto; display: flex; flex-direction: row; }
.typora-export-sidebar { width: 300px; font-size: 0.8rem; margin-top: 80px; margin-right: 18px; }
.typora-export-show-outline #write { --webkit-flex:2; flex: 2 1 0%; }
.typora-export-sidebar .outline-content { position: fixed; top: 0px; max-height: 100%; overflow: hidden auto; padding-bottom: 30px; padding-top: 60px; width: 300px; }
@media screen and (max-width: 1024px) {
  .typora-export-sidebar, .typora-export-sidebar .outline-content { width: 240px; }
}
@media screen and (max-width: 800px) {
  .typora-export-sidebar { display: none; }
}
.outline-content li, .outline-content ul { margin-left: 0px; margin-right: 0px; padding-left: 0px; padding-right: 0px; list-style: none; }
.outline-content ul { margin-top: 0px; margin-bottom: 0px; }
.outline-content strong { font-weight: 400; }
.outline-expander { width: 1rem; height: 1.42857rem; position: relative; display: table-cell; vertical-align: middle; cursor: pointer; padding-left: 4px; }
.outline-expander::before { content: ""; position: relative; font-family: Ionicons; display: inline-block; font-size: 8px; vertical-align: middle; }
.outline-item { padding-top: 3px; padding-bottom: 3px; cursor: pointer; }
.outline-expander:hover::before { content: ""; }
.outline-h1 > .outline-item { padding-left: 0px; }
.outline-h2 > .outline-item { padding-left: 1em; }
.outline-h3 > .outline-item { padding-left: 2em; }
.outline-h4 > .outline-item { padding-left: 3em; }
.outline-h5 > .outline-item { padding-left: 4em; }
.outline-h6 > .outline-item { padding-left: 5em; }
.outline-label { cursor: pointer; display: table-cell; vertical-align: middle; text-decoration: none; color: inherit; }
.outline-label:hover { text-decoration: underline; }
.outline-item:hover { border-color: rgb(245, 245, 245); background-color: var(--item-hover-bg-color); }
.outline-item:hover { margin-left: -28px; margin-right: -28px; border-left: 28px solid transparent; border-right: 28px solid transparent; }
.outline-item-single .outline-expander::before, .outline-item-single .outline-expander:hover::before { display: none; }
.outline-item-open > .outline-item > .outline-expander::before { content: ""; }
.outline-children { display: none; }
.info-panel-tab-wrapper { display: none; }
.outline-item-open > .outline-children { display: block; }
.typora-export .outline-item { padding-top: 1px; padding-bottom: 1px; }
.typora-export .outline-item:hover { margin-right: -8px; border-right: 8px solid transparent; }
.typora-export .outline-expander::before { content: "+"; font-family: inherit; top: -1px; }
.typora-export .outline-expander:hover::before, .typora-export .outline-item-open > .outline-item > .outline-expander::before { content: "−"; }
.typora-export-collapse-outline .outline-children { display: none; }
.typora-export-collapse-outline .outline-item-open > .outline-children, .typora-export-no-collapse-outline .outline-children { display: block; }
.typora-export-no-collapse-outline .outline-expander::before { content: "" !important; }
.typora-export-show-outline .outline-item-active > .outline-item .outline-label { font-weight: 700; }
.md-inline-math-container mjx-container { zoom: 0.95; }


.CodeMirror { height: auto; }
.CodeMirror.cm-s-inner { background: inherit; }
.CodeMirror-scroll { overflow: auto hidden; z-index: 3; }
.CodeMirror-gutter-filler, .CodeMirror-scrollbar-filler { background-color: rgb(255, 255, 255); }
.CodeMirror-gutters { border-right: 1px solid rgb(221, 221, 221); background: inherit; white-space: nowrap; }
.CodeMirror-linenumber { padding: 0px 3px 0px 5px; text-align: right; color: rgb(153, 153, 153); }
.cm-s-inner .cm-keyword { color: rgb(119, 0, 136); }
.cm-s-inner .cm-atom, .cm-s-inner.cm-atom { color: rgb(34, 17, 153); }
.cm-s-inner .cm-number { color: rgb(17, 102, 68); }
.cm-s-inner .cm-def { color: rgb(0, 0, 255); }
.cm-s-inner .cm-variable { color: rgb(0, 0, 0); }
.cm-s-inner .cm-variable-2 { color: rgb(0, 85, 170); }
.cm-s-inner .cm-variable-3 { color: rgb(0, 136, 85); }
.cm-s-inner .cm-string { color: rgb(170, 17, 17); }
.cm-s-inner .cm-property { color: rgb(0, 0, 0); }
.cm-s-inner .cm-operator { color: rgb(152, 26, 26); }
.cm-s-inner .cm-comment, .cm-s-inner.cm-comment { color: rgb(170, 85, 0); }
.cm-s-inner .cm-string-2 { color: rgb(255, 85, 0); }
.cm-s-inner .cm-meta { color: rgb(85, 85, 85); }
.cm-s-inner .cm-qualifier { color: rgb(85, 85, 85); }
.cm-s-inner .cm-builtin { color: rgb(51, 0, 170); }
.cm-s-inner .cm-bracket { color: rgb(153, 153, 119); }
.cm-s-inner .cm-tag { color: rgb(17, 119, 0); }
.cm-s-inner .cm-attribute { color: rgb(0, 0, 204); }
.cm-s-inner .cm-header, .cm-s-inner.cm-header { color: rgb(0, 0, 255); }
.cm-s-inner .cm-quote, .cm-s-inner.cm-quote { color: rgb(0, 153, 0); }
.cm-s-inner .cm-hr, .cm-s-inner.cm-hr { color: rgb(153, 153, 153); }
.cm-s-inner .cm-link, .cm-s-inner.cm-link { color: rgb(0, 0, 204); }
.cm-negative { color: rgb(221, 68, 68); }
.cm-positive { color: rgb(34, 153, 34); }
.cm-header, .cm-strong { font-weight: 700; }
.cm-del { text-decoration: line-through; }
.cm-em { font-style: italic; }
.cm-link { text-decoration: underline; }
.cm-error { color: red; }
.cm-invalidchar { color: red; }
.cm-constant { color: rgb(38, 139, 210); }
.cm-defined { color: rgb(181, 137, 0); }
div.CodeMirror span.CodeMirror-matchingbracket { color: rgb(0, 255, 0); }
div.CodeMirror span.CodeMirror-nonmatchingbracket { color: rgb(255, 34, 34); }
.cm-s-inner .CodeMirror-activeline-background { background: inherit; }
.CodeMirror { position: relative; overflow: hidden; }
.CodeMirror-scroll { height: 100%; outline: 0px; position: relative; box-sizing: content-box; background: inherit; }
.CodeMirror-sizer { position: relative; }
.CodeMirror-gutter-filler, .CodeMirror-hscrollbar, .CodeMirror-scrollbar-filler, .CodeMirror-vscrollbar { position: absolute; z-index: 6; display: none; outline: 0px; }
.CodeMirror-vscrollbar { right: 0px; top: 0px; overflow: hidden; }
.CodeMirror-hscrollbar { bottom: 0px; left: 0px; overflow: auto hidden; }
.CodeMirror-scrollbar-filler { right: 0px; bottom: 0px; }
.CodeMirror-gutter-filler { left: 0px; bottom: 0px; }
.CodeMirror-gutters { position: absolute; left: 0px; top: 0px; padding-bottom: 10px; z-index: 3; overflow-y: hidden; }
.CodeMirror-gutter { white-space: normal; height: 100%; box-sizing: content-box; padding-bottom: 30px; margin-bottom: -32px; display: inline-block; }
.CodeMirror-gutter-wrapper { position: absolute; z-index: 4; background: 0px 0px !important; border: none !important; }
.CodeMirror-gutter-background { position: absolute; top: 0px; bottom: 0px; z-index: 4; }
.CodeMirror-gutter-elt { position: absolute; cursor: default; z-index: 4; }
.CodeMirror-lines { cursor: text; }
.CodeMirror pre { border-radius: 0px; border-width: 0px; background: 0px 0px; font-family: inherit; font-size: inherit; margin: 0px; white-space: pre; overflow-wrap: normal; color: inherit; z-index: 2; position: relative; overflow: visible; }
.CodeMirror-wrap pre { overflow-wrap: break-word; white-space: pre-wrap; word-break: normal; }
.CodeMirror-code pre { border-right: 30px solid transparent; width: fit-content; }
.CodeMirror-wrap .CodeMirror-code pre { border-right: none; width: auto; }
.CodeMirror-linebackground { position: absolute; inset: 0px; z-index: 0; }
.CodeMirror-linewidget { position: relative; z-index: 2; overflow: auto; }
.CodeMirror-wrap .CodeMirror-scroll { overflow-x: hidden; }
.CodeMirror-measure { position: absolute; width: 100%; height: 0px; overflow: hidden; visibility: hidden; }
.CodeMirror-measure pre { position: static; }
.CodeMirror div.CodeMirror-cursor { position: absolute; visibility: hidden; border-right: none; width: 0px; }
.CodeMirror div.CodeMirror-cursor { visibility: hidden; }
.CodeMirror-focused div.CodeMirror-cursor { visibility: inherit; }
.cm-searching { background: rgba(255, 255, 0, 0.4); }
span.cm-underlined { text-decoration: underline; }
span.cm-strikethrough { text-decoration: line-through; }
.cm-tw-syntaxerror { color: rgb(255, 255, 255); background-color: rgb(153, 0, 0); }
.cm-tw-deleted { text-decoration: line-through; }
.cm-tw-header5 { font-weight: 700; }
.cm-tw-listitem:first-child { padding-left: 10px; }
.cm-tw-box { border-style: solid; border-right-width: 1px; border-bottom-width: 1px; border-left-width: 1px; border-color: inherit; border-top-width: 0px !important; }
.cm-tw-underline { text-decoration: underline; }
@media print {
  .CodeMirror div.CodeMirror-cursor { visibility: hidden; }
}


:root {
    --side-bar-bg-color: #fafafa;
    --control-text-color: #777;
}

@include-when-export url(https://fonts.loli.net/css?family=Open+Sans:400italic,700italic,700,400&subset=latin,latin-ext);

/* open-sans-regular - latin-ext_latin */
  /* open-sans-italic - latin-ext_latin */
    /* open-sans-700 - latin-ext_latin */
    /* open-sans-700italic - latin-ext_latin */
      html {
    font-size: 16px;
    -webkit-font-smoothing: antialiased;
}

body {
    font-family: "FZShuSong-Z01","FZKai-Z03","Open Sans","Clear Sans", "Helvetica Neue", Helvetica, Arial, 'Segoe UI Emoji', sans-serif;
    color: rgb(51, 51, 51);
    line-height: 1.6;
}

em {
    font-family: "FZKai-Z03";
    font-style: normal;
}

#write {
    max-width: 860px;
  	margin: 0 auto;
  	padding: 30px;
    padding-bottom: 100px;
}

@media only screen and (min-width: 1400px) {
	#write {
		max-width: 1024px;
	}
}

@media only screen and (min-width: 1800px) {
	#write {
		max-width: 1200px;
	}
}

#write > ul:first-child,
#write > ol:first-child{
    margin-top: 30px;
}

a {
    color: #4183C4;
}
h1,
h2,
h3,
h4,
h5,
h6 {
    position: relative;
    margin-top: 1rem;
    margin-bottom: 1rem;
    font-weight: bold;
    line-height: 1.4;
    cursor: text;
}
h1:hover a.anchor,
h2:hover a.anchor,
h3:hover a.anchor,
h4:hover a.anchor,
h5:hover a.anchor,
h6:hover a.anchor {
    text-decoration: none;
}
h1 tt,
h1 code {
    font-size: inherit;
}
h2 tt,
h2 code {
    font-size: inherit;
}
h3 tt,
h3 code {
    font-size: inherit;
}
h4 tt,
h4 code {
    font-size: inherit;
}
h5 tt,
h5 code {
    font-size: inherit;
}
h6 tt,
h6 code {
    font-size: inherit;
}
h1 {
    font-size: 2.25em;
    line-height: 1.2;
    border-bottom: 1px solid #eee;
}
h2 {
    font-size: 1.75em;
    line-height: 1.225;
    border-bottom: 1px solid #eee;
}

/*@media print {
    .typora-export h1,
    .typora-export h2 {
        border-bottom: none;
        padding-bottom: initial;
    }

    .typora-export h1::after,
    .typora-export h2::after {
        content: "";
        display: block;
        height: 100px;
        margin-top: -96px;
        border-top: 1px solid #eee;
    }
}*/

h3 {
    font-size: 1.5em;
    line-height: 1.43;
}
h4 {
    font-size: 1.25em;
}
h5 {
    font-size: 1em;
}
h6 {
   font-size: 1em;
    color: #777;
}
p,
blockquote,
ul,
ol,
dl,
table{
    margin: 0.8em 0;
}
li>ol,
li>ul {
    margin: 0 0;
}
hr {
    height: 2px;
    padding: 0;
    margin: 16px 0;
    background-color: #e7e7e7;
    border: 0 none;
    overflow: hidden;
    box-sizing: content-box;
}

li p.first {
    display: inline-block;
}
ul,
ol {
    padding-left: 30px;
}
ul:first-child,
ol:first-child {
    margin-top: 0;
}
ul:last-child,
ol:last-child {
    margin-bottom: 0;
}
blockquote {
    border-left: 4px solid #dfe2e5;
    padding: 0 15px;
    color: #777777;
}
blockquote blockquote {
    padding-right: 0;
}
table {
    padding: 0;
    word-break: initial;
}
table tr {
    border: 1px solid #dfe2e5;
    margin: 0;
    padding: 0;
}
table tr:nth-child(2n),
thead {
    background-color: #f8f8f8;
}
table th {
    font-weight: bold;
    border: 1px solid #dfe2e5;
    border-bottom: 0;
    margin: 0;
    padding: 6px 13px;
}
table td {
    border: 1px solid #dfe2e5;
    margin: 0;
    padding: 6px 13px;
}
table th:first-child,
table td:first-child {
    margin-top: 0;
}
table th:last-child,
table td:last-child {
    margin-bottom: 0;
}

.CodeMirror-lines {
    padding-left: 4px;
}

.code-tooltip {
    box-shadow: 0 1px 1px 0 rgba(0,28,36,.3);
    border-top: 1px solid #eef2f2;
}

.md-fences,
code,
tt {
    border: 1px solid #e7eaed;
    background-color: #f8f8f8;
    border-radius: 3px;
    padding: 0;
    padding: 2px 4px 0px 4px;
    font-size: 0.9em;
}

code {
    background-color: #f3f4f4;
    padding: 0 2px 0 2px;
}

.md-fences {
    margin-bottom: 15px;
    margin-top: 15px;
    padding-top: 8px;
    padding-bottom: 6px;
}


.md-task-list-item > input {
  margin-left: -1.3em;
}

@media print {
    html {
        font-size: 13px;
    }
    table,
    pre {
        page-break-inside: avoid;
    }
    pre {
        word-wrap: break-word;
    }
}

.md-fences {
	background-color: #f8f8f8;
}
#write pre.md-meta-block {
	padding: 1rem;
    font-size: 85%;
    line-height: 1.45;
    background-color: #f7f7f7;
    border: 0;
    border-radius: 3px;
    color: #777777;
    margin-top: 0 !important;
}

.mathjax-block>.code-tooltip {
	bottom: .375rem;
}

.md-mathjax-midline {
    background: #fafafa;
}

#write>h3.md-focus:before{
	left: -1.5625rem;
	top: .375rem;
}
#write>h4.md-focus:before{
	left: -1.5625rem;
	top: .285714286rem;
}
#write>h5.md-focus:before{
	left: -1.5625rem;
	top: .285714286rem;
}
#write>h6.md-focus:before{
	left: -1.5625rem;
	top: .285714286rem;
}
.md-image>.md-meta {
    /*border: 1px solid #ddd;*/
    border-radius: 3px;
    padding: 2px 0px 0px 4px;
    font-size: 0.9em;
    color: inherit;
}

.md-tag {
    color: #a7a7a7;
    opacity: 1;
}

.md-toc { 
    margin-top:20px;
    padding-bottom:20px;
}

.sidebar-tabs {
    border-bottom: none;
}

#typora-quick-open {
    border: 1px solid #ddd;
    background-color: #f8f8f8;
}

#typora-quick-open-item {
    background-color: #FAFAFA;
    border-color: #FEFEFE #e5e5e5 #e5e5e5 #eee;
    border-style: solid;
    border-width: 1px;
}

/** focus mode */
.on-focus-mode blockquote {
    border-left-color: rgba(85, 85, 85, 0.12);
}

header, .context-menu, .megamenu-content, footer{
    font-family: "Segoe UI", "Arial", sans-serif;
}

.file-node-content:hover .file-node-icon,
.file-node-content:hover .file-node-open-state{
    visibility: visible;
}

.mac-seamless-mode #typora-sidebar {
    background-color: #fafafa;
    background-color: var(--side-bar-bg-color);
}

.md-lang {
    color: #b4654d;
}

/*.html-for-mac {
    --item-hover-bg-color: #E6F0FE;
}*/

#md-notification .btn {
    border: 0;
}

.dropdown-menu .divider {
    border-color: #e5e5e5;
    opacity: 0.4;
}

.ty-preferences .window-content {
    background-color: #fafafa;
}

.ty-preferences .nav-group-item.active {
    color: white;
    background: #999;
}

.menu-item-container a.menu-style-btn {
    background-color: #f5f8fa;
    background-image: linear-gradient( 180deg , hsla(0, 0%, 100%, 0.8), hsla(0, 0%, 100%, 0)); 
}

figure figcaption {
  text-align: center;
  color: brown;
}

figure img {
  display: block;
  margin-left: auto;
  margin-right: auto;
}



mjx-container[jax="SVG"] {
  direction: ltr;
}

mjx-container[jax="SVG"] > svg {
  overflow: visible;
  min-height: 1px;
  min-width: 1px;
}

mjx-container[jax="SVG"] > svg a {
  fill: blue;
  stroke: blue;
}

mjx-assistive-mml {
  position: absolute !important;
  top: 0px;
  left: 0px;
  clip: rect(1px, 1px, 1px, 1px);
  padding: 1px 0px 0px 0px !important;
  border: 0px !important;
  display: block !important;
  width: auto !important;
  overflow: hidden !important;
  -webkit-touch-callout: none;
  -webkit-user-select: none;
  -khtml-user-select: none;
  -moz-user-select: none;
  -ms-user-select: none;
  user-select: none;
}

mjx-assistive-mml[display="block"] {
  width: 100% !important;
}

mjx-container[jax="SVG"][display="true"] {
  display: block;
  text-align: center;
  margin: 1em 0;
}

mjx-container[jax="SVG"][display="true"][width="full"] {
  display: flex;
}

mjx-container[jax="SVG"][justify="left"] {
  text-align: left;
}

mjx-container[jax="SVG"][justify="right"] {
  text-align: right;
}

g[data-mml-node="merror"] > g {
  fill: red;
  stroke: red;
}

g[data-mml-node="merror"] > rect[data-background] {
  fill: yellow;
  stroke: none;
}

g[data-mml-node="mtable"] > line[data-line], svg[data-table] > g > line[data-line] {
  stroke-width: 70px;
  fill: none;
}

g[data-mml-node="mtable"] > rect[data-frame], svg[data-table] > g > rect[data-frame] {
  stroke-width: 70px;
  fill: none;
}

g[data-mml-node="mtable"] > .mjx-dashed, svg[data-table] > g > .mjx-dashed {
  stroke-dasharray: 140;
}

g[data-mml-node="mtable"] > .mjx-dotted, svg[data-table] > g > .mjx-dotted {
  stroke-linecap: round;
  stroke-dasharray: 0,140;
}

g[data-mml-node="mtable"] > g > svg {
  overflow: visible;
}

[jax="SVG"] mjx-tool {
  display: inline-block;
  position: relative;
  width: 0;
  height: 0;
}

[jax="SVG"] mjx-tool > mjx-tip {
  position: absolute;
  top: 0;
  left: 0;
}

mjx-tool > mjx-tip {
  display: inline-block;
  padding: .2em;
  border: 1px solid #888;
  font-size: 70%;
  background-color: #F8F8F8;
  color: black;
  box-shadow: 2px 2px 5px #AAAAAA;
}

g[data-mml-node="maction"][data-toggle] {
  cursor: pointer;
}

mjx-status {
  display: block;
  position: fixed;
  left: 1em;
  bottom: 1em;
  min-width: 25%;
  padding: .2em .4em;
  border: 1px solid #888;
  font-size: 90%;
  background-color: #F8F8F8;
  color: black;
}

foreignObject[data-mjx-xml] {
  font-family: initial;
  line-height: normal;
  overflow: visible;
}

mjx-container[jax="SVG"] path[data-c], mjx-container[jax="SVG"] use[data-c] {
  stroke-width: 3;
}

g[data-mml-node="xypic"] path {
  stroke-width: inherit;
}

.MathJax g[data-mml-node="xypic"] path {
  stroke-width: inherit;
}
mjx-container[jax="SVG"] path[data-c], mjx-container[jax="SVG"] use[data-c] {
							stroke-width: 0;
						}
</style><title>index</title>
</head>
<body class='typora-export'><div class='typora-export-content'>
<div id='write'  class=''><div class='md-toc' mdtype='toc'><p class="md-toc-content" role="list"><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n2"><a class="md-toc-inner" href="#flink流处理简介">Flink流处理简介</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n14"><a class="md-toc-inner" href="#flink是什么">Flink是什么</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n24"><a class="md-toc-inner" href="#flink目前在国内企业的应用">Flink目前在国内企业的应用</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n29"><a class="md-toc-inner" href="#为什么选择flink">为什么选择Flink</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n46"><a class="md-toc-inner" href="#哪些行业需要处理流数据">哪些行业需要处理流数据</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n76"><a class="md-toc-inner" href="#数据处理架构">数据处理架构</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n87"><a class="md-toc-inner" href="#oltp">OLTP</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n93"><a class="md-toc-inner" href="#olap">OLAP</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n99"><a class="md-toc-inner" href="#lambda架构">LAMBDA架构</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n121"><a class="md-toc-inner" href="#有状态的流处理">有状态的流处理</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n145"><a class="md-toc-inner" href="#流处理框架的演变">流处理框架的演变</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n176"><a class="md-toc-inner" href="#大数据理论的发展">大数据理论的发展</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n184"><a class="md-toc-inner" href="#flink的主要特点">Flink的主要特点</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n207"><a class="md-toc-inner" href="#事件驱动event-driven）">事件驱动（Event Driven）</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n222"><a class="md-toc-inner" href="#基于流的世界观">基于流的世界观</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n228"><a class="md-toc-inner" href="#时间维度">时间维度</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n234"><a class="md-toc-inner" href="#分层api">分层API</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n243"><a class="md-toc-inner" href="#flink中最重要的两个底层api">Flink中最重要的两个底层API</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n249"><a class="md-toc-inner" href="#学习flink最重要的两个数据结构">学习Flink最重要的两个数据结构</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n255"><a class="md-toc-inner" href="#flink中最重要的三个核心概念">Flink中最重要的三个核心概念</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n264"><a class="md-toc-inner" href="#分布式系统重要概念">分布式系统重要概念</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n307"><a class="md-toc-inner" href="#flink-vs-spark-streaming">Flink vs Spark Streaming</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n336"><a class="md-toc-inner" href="#flink程序的典型结构">Flink程序的典型结构</a></span><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n350"><a class="md-toc-inner" href="#flink运行时架构">Flink运行时架构</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n351"><a class="md-toc-inner" href="#flink主从架构">Flink主从架构</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n364"><a class="md-toc-inner" href="#作业管理器">作业管理器</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n375"><a class="md-toc-inner" href="#任务管理器">任务管理器</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n387"><a class="md-toc-inner" href="#任务插槽">任务插槽</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n403"><a class="md-toc-inner" href="#并行度的设置">并行度的设置</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n416"><a class="md-toc-inner" href="#并行度设置的最佳实践">并行度设置的最佳实践</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n431"><a class="md-toc-inner" href="#任务提交流程">任务提交流程</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n438"><a class="md-toc-inner" href="#flink中的dag数据结构">Flink中的DAG数据结构</a></span><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n479"><a class="md-toc-inner" href="#datastream-api">DataStream API</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n480"><a class="md-toc-inner" href="#自定义数据源">自定义数据源</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n481"><a class="md-toc-inner" href="#pojo-class">POJO CLASS</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n491"><a class="md-toc-inner" href="#sourcefunction">SourceFunction</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n515"><a class="md-toc-inner" href="#基本转换算子">基本转换算子</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n530"><a class="md-toc-inner" href="#逻辑分区算子">逻辑分区算子</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n560"><a class="md-toc-inner" href="#reduce算子如何维护逻辑分区">reduce算子如何维护逻辑分区</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n573"><a class="md-toc-inner" href="#物理分区算子">物理分区算子</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n588"><a class="md-toc-inner" href="#富函数">富函数</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n612"><a class="md-toc-inner" href="#自定义输出">自定义输出</a></span><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n615"><a class="md-toc-inner" href="#底层api">底层API</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n617"><a class="md-toc-inner" href="#processfunction">ProcessFunction</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n626"><a class="md-toc-inner" href="#keyedprocessfunction">KeyedProcessFunction</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n658"><a class="md-toc-inner" href="#逻辑分区维护的状态-键控状态变量">逻辑分区维护的状态-键控状态变量</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n667"><a class="md-toc-inner" href="#valuestate-值状态变量">ValueState-值状态变量</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n686"><a class="md-toc-inner" href="#liststate-列表状态变量">ListState-列表状态变量</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n695"><a class="md-toc-inner" href="#mapstate-字典状态变量">MapState-字典状态变量</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n706"><a class="md-toc-inner" href="#窗口api">窗口API</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n707"><a class="md-toc-inner" href="#processwindowfunction">ProcessWindowFunction</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n738"><a class="md-toc-inner" href="#aggregatefunction">AggregateFunction</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n751"><a class="md-toc-inner" href="#将aggregatefunction和processwindowfunction结合使用">将AggregateFunction和ProcessWindowFunction结合使用</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n755"><a class="md-toc-inner" href="#窗口的底层实现">窗口的底层实现</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n761"><a class="md-toc-inner" href="#processallwindowfunction">ProcessAllWindowFunction</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n765"><a class="md-toc-inner" href="#触发器">触发器</a></span><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n791"><a class="md-toc-inner" href="#窗口">窗口</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n792"><a class="md-toc-inner" href="#窗口概念">窗口概念</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n800"><a class="md-toc-inner" href="#窗口的本质">窗口的本质</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n804"><a class="md-toc-inner" href="#窗口类型">窗口类型</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n822"><a class="md-toc-inner" href="#滚动窗口">滚动窗口</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n829"><a class="md-toc-inner" href="#滑动窗口">滑动窗口</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n839"><a class="md-toc-inner" href="#会话窗口">会话窗口</a></span><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n850"><a class="md-toc-inner" href="#逻辑时钟-水位线">逻辑时钟-水位线</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n877"><a class="md-toc-inner" href="#有关水位线的一些约定">有关水位线的一些约定</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n900"><a class="md-toc-inner" href="#多流转换时水位线的传播机制">多流转换时水位线的传播机制</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n912"><a class="md-toc-inner" href="#水位线设置的最佳实践">水位线设置的最佳实践</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n918"><a class="md-toc-inner" href="#kafka水位线设置">Kafka水位线设置</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n921"><a class="md-toc-inner" href="#如何处理迟到数据">如何处理迟到数据</a></span><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n947"><a class="md-toc-inner" href="#多流合并">多流合并</a></span><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n1003"><a class="md-toc-inner" href="#flink中的状态变量">Flink中的状态变量</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1028"><a class="md-toc-inner" href="#flink中的状态管理">Flink中的状态管理</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n1029"><a class="md-toc-inner" href="#状态后端">状态后端</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n1044"><a class="md-toc-inner" href="#选择一个状态后端">选择一个状态后端</a></span><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n1067"><a class="md-toc-inner" href="#flink的容错机制">Flink的容错机制</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1068"><a class="md-toc-inner" href="#flink程序如何从检查点恢复程序">Flink程序如何从检查点恢复程序</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1082"><a class="md-toc-inner" href="#flink如何保存检查点">Flink如何保存检查点</a></span><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n1134"><a class="md-toc-inner" href="#端到端一致性">端到端一致性</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1135"><a class="md-toc-inner" href="#状态一致性分类">状态一致性分类</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1152"><a class="md-toc-inner" href="#端到端状态一致性">端到端状态一致性</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1159"><a class="md-toc-inner" href="#端到端exactly-once一致性保障">端到端EXACTLY-ONCE一致性保障</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1173"><a class="md-toc-inner" href="#幂等写入">幂等写入</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1175"><a class="md-toc-inner" href="#事务写入">事务写入</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1193"><a class="md-toc-inner" href="#预写式日志">预写式日志</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1201"><a class="md-toc-inner" href="#两阶段提交">两阶段提交</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1214"><a class="md-toc-inner" href="#总结">总结</a></span><span role="listitem" class="md-toc-item md-toc-h2" data-ref="n1236"><a class="md-toc-inner" href="#kafka--flink--kafka端到端一致性">Kafka ➡️ Flink ➡️ Kafka端到端一致性</a></span><span role="listitem" class="md-toc-item md-toc-h3" data-ref="n1244"><a class="md-toc-inner" href="#两阶段提交步骤">两阶段提交步骤</a></span><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n1258"><a class="md-toc-inner" href="#背压问题">背压问题</a></span><span role="listitem" class="md-toc-item md-toc-h1" data-ref="n1278"><a class="md-toc-inner" href="#有限状态机">有限状态机</a></span></p></div><h1 id='flink流处理简介'><span>Flink流处理简介</span></h1><ul><li><span>Flink是什么</span></li><li><span>为什么要用Flink</span></li><li><span>流处理的发展和演变</span></li><li><span>Flink的主要特点</span></li><li><span>Flink vs Spark Streaming</span></li></ul><h2 id='flink是什么'><span>Flink是什么</span></h2><p><span>Apache Flink是一个框架和</span><span style="color:red"><span>分布式</span></span><span>处理引擎，用于对无界和有界数据</span><span style="color:red"><span>流</span></span><span>进行</span><span style="color:red"><span>状态</span></span><span>计算。</span></p><p><strong><span>有界数据流</span></strong></p><p><span>Flink会将文件看成一条流，一条一条的处理文件中的数据，例如下面包含两条数据的csv文件，将会一条一条处理，而不会整个文件进行处理。</span></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="csv"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="csv"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation"><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">Mary,./home,1970-01-01 00:00:01</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">Bob,./cart,1970-01-01 00:00:02</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 69px;"></div><div class="CodeMirror-gutters" style="height: 69px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><p><strong><span>无界数据流</span></strong></p><p><span>Flink Kafka Consumer，来一条处理一条。</span></p><p><strong><span>流批统一</span></strong></p><p><span>Flink底层引擎对于文件和Kafka消息队列的处理逻辑是一致的。</span></p><p><span>流批统一，一套业务逻辑代码搞定流和批。</span></p><h2 id='flink目前在国内企业的应用'><span>Flink目前在国内企业的应用</span></h2><p><span>所有大厂都在重度使用Flink。特别是阿里，收购了Flink母公司，并为Flink贡献了海量的代码，双十一大屏的指标全部使用Flink计算。几年前的数据：阿里巴巴每秒钟使用Flink处理4.6PB的数据量。</span></p><p><span>快手：Flink集群有1500台机器</span></p><p><span>字节跳动：Apache Storm </span>➡️<span> Apache Flink</span></p><p><span>Flink对所有的传统的流处理框架是降维打击。</span></p><h2 id='为什么选择flink'><span>为什么选择Flink</span></h2><ul><li><p><span>流数据更真实地反映了我们的生活方式（点击流）</span></p></li><li><p><span>传统的数据架构是基于有限数据集的，将数据流人为的变成了离线数据来处理</span></p></li><li><p><span>我们的目标</span></p><ul><li><span>低延迟（Spark Streaming 的延迟是</span><span style="color:red"><span>秒级</span></span><span>（需要攒批然后计算），Flink延迟是</span><span style="color:red"><span>毫秒级</span></span><span>（Spark Streaming的千分之一，Flink是来一条数据就处理一条数据，没有攒批的过程），由于操作系统的软件时钟的精度是毫秒级，所以可以认为Flink是没有延迟的）</span></li><li><span>高吞吐（加机器）</span></li><li><span>结果的准确性和良好的容错性（EXACTLY-ONCE，恰好处理一次，精准一次消费）</span></li></ul></li></ul><blockquote><p><span>Spark Streaming开窗口的大小要求是500毫秒（0.5秒）的整数倍</span>
<span>延迟的概念：数据到达的时间和得到计算结果的时间的间隔</span>
<span>吞吐的概念：单位时间内能处理的数据量的大小</span>
<span>良好的容错性：程序宕机再重启，计算结果还是正确的。</span>
<span>结果的准确性：某个事件的发生时间是：2020-01-01 00:00:09，</span>
<span>如果我们开一个 10 秒钟的滚动窗口，窗口</span>
<span>窗口一：2020-01-01 00:00:00 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="0.529ex" role="img" focusable="false" viewBox="0 -367 778 234" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: 0.301ex;"><defs><path id="MJX-20-TEX-N-223C" d="M55 166Q55 241 101 304T222 367Q260 367 296 349T362 304T421 252T484 208T554 189Q616 189 655 236T694 338Q694 350 698 358T708 367Q722 367 722 334Q722 260 677 197T562 134H554Q517 134 481 152T414 196T355 248T292 293T223 311Q179 311 145 286Q109 257 96 218T80 156T69 133Q55 133 55 166Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="223C" xlink:href="#MJX-20-TEX-N-223C"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>∼</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\sim</script><span> 2020-01-01 00:00:10</span>
<span>窗口二：2020-01-01 00:00:10 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="0.529ex" role="img" focusable="false" viewBox="0 -367 778 234" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: 0.301ex;"><defs><path id="MJX-20-TEX-N-223C" d="M55 166Q55 241 101 304T222 367Q260 367 296 349T362 304T421 252T484 208T554 189Q616 189 655 236T694 338Q694 350 698 358T708 367Q722 367 722 334Q722 260 677 197T562 134H554Q517 134 481 152T414 196T355 248T292 293T223 311Q179 311 145 286Q109 257 96 218T80 156T69 133Q55 133 55 166Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="223C" xlink:href="#MJX-20-TEX-N-223C"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>∼</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\sim</script><span> 2020-01-01 00:00:20</span>
<span>事件由于网络延迟，到达Spark Streaming服务器的时间是：</span>
<span>2020-01-01 00:00:11，被分配到了窗口二中，导致计算结果的不准确。Flink是没有这个问题的，</span><span style="color:red"><span>Flink可以按照事件真正发生的时间处理事件</span></span><span>。</span></p></blockquote><h2 id='哪些行业需要处理流数据'><span>哪些行业需要处理流数据</span></h2><ul><li><p><span>所有行业都需要处理流数据，因为数据本质上是流的形式。</span></p></li><li><p><span>电商</span></p><ul><li><span>计算PV（Page View，页面访问次数统计）</span></li><li><span>UV（Unique Visitor，独立访客统计）</span></li><li><span>实时热门商品（每过 5 分钟计算一次过去 1 小时的PV最多的商品）等指标。</span></li></ul></li><li><p><span>物联网（IoT）</span></p><ul><li><span>温度传感器连续1秒钟温度上升的检测。</span></li></ul></li><li><p><span>风险控制</span></p><ul><li><span>连续三次登录失败的检测（爬虫行为，暴力破解用户名和密码）</span></li><li><span>信用卡欺诈检测（连续两笔消费，第一笔消费小于1元，第二笔消费大于500元）</span></li><li><span>超时未支付订单的检测（美团下订单，15 分钟未支付，将关闭订单）</span></li><li><span>刷单行为（连续三个事件：登录 </span>➡️<span> 下订单 </span>➡️<span> 支付）</span></li></ul></li></ul><h2 id='数据处理架构'><span>数据处理架构</span></h2><ul><li><span>OLTP（在线事务处理）</span></li><li><span>OLAP（在线分析处理）</span></li><li><span>LAMBDA架构</span></li><li><span>有状态的流处理</span></li></ul><figure><img src="figure/5.svg" alt="架构演进过程" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-1 架构演进过程</figcaption></figure><h3 id='oltp'><span>OLTP</span></h3><p><span>用一个关系型数据库（Oracle，IBM DB2，MySQL）完成所有需求。</span></p><p><span>优点：架构很简单，适合初创公司。</span></p><p><span>缺点：耦合度很高。性能堪忧。</span></p><p><span>数据库管理员（DBA）负责优化数据库（分库分表，建索引）</span></p><figure><img src="figure/6.svg" alt="OLTP架构" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-2 OLTP架构</figcaption></figure><h3 id='olap'><span>OLAP</span></h3><p><span>优点：解耦合</span></p><p><span>缺点：得到计算结果的延迟比较大，不够实时</span></p><p><span>关系型数据库：用户表，用户地址表，订单表</span></p><p><span>统计指标的计算：Hive</span></p><figure><img src="figure/7.svg" alt="OLAP架构" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-3 OLAP架构</figcaption></figure><h3 id='lambda架构'><span>LAMBDA架构</span></h3><blockquote><p>📝<span>这里的LAMBDA和lambda表达式没有关系。</span></p></blockquote><ul><li><p><span>用</span><span style="color:red"><span>两套</span></span><span>系统，同时保证低延迟和结果准确性。</span></p><ul><li><span>使用批处理框架（Hive）保证结果的准确性。但是结果可能会有较大的延迟。</span></li><li><span>使用流处理框架（Spark Streaming）保证结果的低延迟。但是结果可能计算的不准确。</span></li></ul></li></ul><figure><img src="figure/3.svg" alt="LAMBDA架构" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-4 LAMBDA架构</figcaption></figure><p><span>例子：</span></p><blockquote><p><span>点击事件：Mary,./Home,1970-01-01 00:00:09</span>
<span>统计1970-01-01 00:00:00 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="0.529ex" role="img" focusable="false" viewBox="0 -367 778 234" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: 0.301ex;"><defs><path id="MJX-20-TEX-N-223C" d="M55 166Q55 241 101 304T222 367Q260 367 296 349T362 304T421 252T484 208T554 189Q616 189 655 236T694 338Q694 350 698 358T708 367Q722 367 722 334Q722 260 677 197T562 134H554Q517 134 481 152T414 196T355 248T292 293T223 311Q179 311 145 286Q109 257 96 218T80 156T69 133Q55 133 55 166Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="223C" xlink:href="#MJX-20-TEX-N-223C"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>∼</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\sim</script><span> 1970-01-01 00:00:10窗口的pv指标</span>
<span>由于存在网络延时，事件在11秒时到达服务器，Spark Streaming将数据分到窗口1970-01-01 00:00:10 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="0.529ex" role="img" focusable="false" viewBox="0 -367 778 234" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: 0.301ex;"><defs><path id="MJX-20-TEX-N-223C" d="M55 166Q55 241 101 304T222 367Q260 367 296 349T362 304T421 252T484 208T554 189Q616 189 655 236T694 338Q694 350 698 358T708 367Q722 367 722 334Q722 260 677 197T562 134H554Q517 134 481 152T414 196T355 248T292 293T223 311Q179 311 145 286Q109 257 96 218T80 156T69 133Q55 133 55 166Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="223C" xlink:href="#MJX-20-TEX-N-223C"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>∼</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\sim</script><span> 1970-01-01 00:00:20</span>
<span>这样的话，0 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="0.529ex" role="img" focusable="false" viewBox="0 -367 778 234" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: 0.301ex;"><defs><path id="MJX-20-TEX-N-223C" d="M55 166Q55 241 101 304T222 367Q260 367 296 349T362 304T421 252T484 208T554 189Q616 189 655 236T694 338Q694 350 698 358T708 367Q722 367 722 334Q722 260 677 197T562 134H554Q517 134 481 152T414 196T355 248T292 293T223 311Q179 311 145 286Q109 257 96 218T80 156T69 133Q55 133 55 166Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="223C" xlink:href="#MJX-20-TEX-N-223C"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>∼</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\sim</script><span> 10的统计结果就会少了一条，所以我们可以在晚些时候（例如晚上12点，使用HIVE再来计算一次0 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="0.529ex" role="img" focusable="false" viewBox="0 -367 778 234" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: 0.301ex;"><defs><path id="MJX-20-TEX-N-223C" d="M55 166Q55 241 101 304T222 367Q260 367 296 349T362 304T421 252T484 208T554 189Q616 189 655 236T694 338Q694 350 698 358T708 367Q722 367 722 334Q722 260 677 197T562 134H554Q517 134 481 152T414 196T355 248T292 293T223 311Q179 311 145 286Q109 257 96 218T80 156T69 133Q55 133 55 166Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="223C" xlink:href="#MJX-20-TEX-N-223C"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>∼</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\sim</script><span> 10窗口的页面访问量，校正一下计算结果，写入数据库）。</span></p><ul><li><span>Spark Streaming：保证结果的低延迟，但结果可能不准确</span></li><li><span>Hive：保证结果的准确性，但结果的延迟比较高</span>
<span>两套系统计算的指标是同一个指标（都是0～10的页面访问量），所以写的代码也很类似。带来了维护的困难，因为一套代码要写两遍。</span>
<span>能不能写一份代码就搞定低延迟和结果准确性呢？</span>
<span>能不能使用一套框架既保证低延迟又保证结果的准确性呢？</span>
<span>答案就是：Flink。</span></li></ul></blockquote><figure><img src="figure/4.svg" alt="有状态的流处理" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-5 有状态的流处理</figcaption></figure><h2 id='有状态的流处理'><span>有状态的流处理</span></h2><p><span>状态：有些算子可以维护内部状态</span></p><p><span>流处理：来一条数据处理一条（不攒批）</span></p><p><span>示意图：来一条数据统计值加一</span></p><ol start='' ><li><span>来了一条数据（圆形），触发Flink应用程序的执行</span></li><li><span>读取内部状态（统计值）</span></li><li><span>统计值加1</span></li><li><span>将统计值写回内部状态</span></li><li><span>输出统计值（正方形）</span></li></ol><p><span>从图中可以看出，Flink处理数据是没有落盘操作的。</span></p><p><span>只在保存检查点（本地状态）的时候落盘。</span></p><figure><img src="figure/8.svg" alt="Flink的有状态流处理" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-6 Flink的有状态流处理</figcaption></figure><p><span>数据流就像生产流水线</span></p><ul><li><span>数据是生产流水线上的产品</span></li><li><span>算子是生产流水线上的工人</span></li></ul><h2 id='流处理框架的演变'><span>流处理框架的演变</span></h2><ul><li><p><span>第一代 Apache Storm</span></p><ul><li><span>低延迟</span></li><li><span>吞吐量很差</span></li><li><span>计算结果也不能保证准确性</span></li></ul></li><li><p><span>第二代 Apache Spark Streaming</span></p><ul><li><span>高吞吐</span></li><li><span>延迟比较高（秒级）</span></li><li><span>计算结果也不能保证准确性</span></li></ul></li><li><p><span>第三代 Apache Flink</span></p><ul><li><span>低延迟</span></li><li><span>高吞吐</span></li><li><span>时间正确/语义化窗口</span></li><li><span>计算结果的正确性（EXACTLY-ONCE）</span></li></ul></li></ul><h2 id='大数据理论的发展'><span>大数据理论的发展</span></h2><ul><li><span>谷歌三篇论文（2003年）：GFS </span>➡️<span> HDFS，MapReduce </span>➡️<span> Hadoop，Big Table </span>➡️<span> HBase</span></li><li><span>Spark（2008年）、Spark Streaming（将计算场景从磁盘转移到了内存，百倍提升）</span></li><li><span>谷歌（2015年）：Dataflow Model </span>➡️<span> Apache Flink（2015年底）</span></li></ul><h2 id='flink的主要特点'><span>Flink的主要特点</span></h2><ul><li><p><span>事件驱动</span></p></li><li><p><span>基于流的世界观</span></p></li><li><p><span>分层API</span></p></li><li><p><span>支持事件时间（EventTime）和处理时间（ProcessingTime）语义</span></p><ul><li><span>事件时间（逻辑时钟）：事件真正发生的时间，时间戳包含在事件里</span></li><li><span>处理时间（物理时钟）：事件到达服务器的机器时间</span></li></ul></li><li><p><span>精确一次（EXACTLY-ONCE）的状态一致性保证</span></p></li><li><p><span>低延迟，每秒处理数百万个事件，毫秒级延迟</span></p></li><li><p><span>与众多常用存储系统的连接（ES，HBase，MySQL，Redis…）</span></p></li><li><p><span>高可用（Zookeeper），动态扩展，实现7</span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="1.09ex" role="img" focusable="false" viewBox="0 -491 778 482" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: 0.02ex;"><defs><path id="MJX-21-TEX-N-D7" d="M630 29Q630 9 609 9Q604 9 587 25T493 118L389 222L284 117Q178 13 175 11Q171 9 168 9Q160 9 154 15T147 29Q147 36 161 51T255 146L359 250L255 354Q174 435 161 449T147 471Q147 480 153 485T168 490Q173 490 175 489Q178 487 284 383L389 278L493 382Q570 459 587 475T609 491Q630 491 630 471Q630 464 620 453T522 355L418 250L522 145Q606 61 618 48T630 29Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="D7" xlink:href="#MJX-21-TEX-N-D7"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>×</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\times</script><span>24小时全天候运行</span></p></li></ul><h3 id='事件驱动event-driven）'><span>事件驱动（Event Driven）</span></h3><ul><li><span>来一条数据就处理一次，每来一条数据就会驱动DAG（有向无环图）中算子的运行，也可以看作数据在DAG里面流动。</span></li></ul><figure><img src="figure/20.svg" alt="事件驱动示意图" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-7 事件驱动示意图</figcaption></figure><ul><li><span>事件到达之后立即驱动MAP的运行，MAP处理完事件之后，将ETL后的数据发送给FILTER算子，就会立刻驱动FILTER算子的运行，依次类推。</span></li><li><span>由于Flink是有状态的流处理，所以可能会有算子会维护和操作内部状态，例如REDUCE算子。而MAP和FILTER是无状态的计算。</span></li><li><span>传统批处理示意图如下：</span></li></ul><figure><img src="figure/21.svg" alt="传统批处理示意图" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-8 传统批处理示意图</figcaption></figure><blockquote><p><strong><span>流处理</span></strong>
<span>来一条数据处理一条。</span>
<span>flatMap算子什么时候会被触发执行？当flatMap算子的输入到达的时候，触发执行。</span>
<span>sum算子什么时候触发执行呢？当输入数据到达的时候，触发执行。来一条输入数据就要触发一次sum算子的执行。</span>
<span>算子都是</span><span style="color:red"><span>被动</span></span><span>执行的，数据不来不执行。</span>
<span>这个特性叫做事件驱动，算子的输入事件驱动算子的执行。</span>
<span>生产流水线上的工人（算子），上游的产品到达，触发工人的操作。</span></p></blockquote><h3 id='基于流的世界观'><span>基于流的世界观</span></h3><ul><li><span>在Flink的世界观中，一切都是由流组成的，离线数据是有界的流；实时数据是一个没有界限的流：这就是所谓的有界流和无界流。</span></li><li><span>在Spark Streaming的世界观中，一切都是由批组成的，离线数据是一批数据；实时数据是无数个微小批次组成的数据。</span></li></ul><h3 id='时间维度'><span>时间维度</span></h3><ul><li><span>流的世界观最重要的一点其实是在静态的离线数据上面加了一个维度：时间。</span></li><li><span>这个观点来自爱因斯坦的狭义相对论，批处理类似牛顿力学（坐标系：x，y，z），流处理类似狭义相对论力学（坐标系：x，y，z，t）。</span></li></ul><h3 id='分层api'><span>分层API</span></h3><figure><img src="figure/22.svg" alt="API层级结构" style="width:50%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-9 API层级结构</figcaption></figure><ul><li><span>越抽象，越容易使用，但无法实现复杂的需求。</span></li><li><span>越底层，越难掌握，但可以实现很复杂的需求。</span></li><li><span>我们的学习重点：DataStream API和处理函数。</span></li></ul><h2 id='flink中最重要的两个底层api'><span>Flink中最重要的两个底层API</span></h2><p><strong><span>对于单条流的处理</span></strong></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang=""><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang=""><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>1</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation"><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">KeyedProcessFunction</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 35px;"></div><div class="CodeMirror-gutters" style="height: 35px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><p><span>KeyedProcessFunction既可以实现flatMap，也可以实现reduce</span></p><p><strong><span>对于双流JOIN的情况</span></strong></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang=""><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang=""><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>1</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation"><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">CoProcessFunction</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 35px;"></div><div class="CodeMirror-gutters" style="height: 35px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><h2 id='学习flink最重要的两个数据结构'><span>学习Flink最重要的两个数据结构</span></h2><ul><li><span>有向无环图（DAG）：顶点和有向边。</span></li><li><span>哈希表（HashMap）</span></li></ul><h2 id='flink中最重要的三个核心概念'><span>Flink中最重要的三个核心概念</span></h2><p><span>我们在学习Flink时，只需要彻底理解下面三个概念，就能够很好的理解Flink的本质：</span></p><ul><li><span>时间语义：事件时间，逻辑时钟（水位线），语义化窗口</span></li><li><span>状态：分清有状态的算子和无状态的算子的区别</span></li><li><span>事件驱动：来一条数据处理一次，Flink中的算子的计算逻辑都是被动执行的。</span></li></ul><h2 id='分布式系统重要概念'><span>分布式系统重要概念</span></h2><ul><li><p><span>分区：物理分区和逻辑分区的区别是什么</span></p><ul><li><span>hadoop的物理分区：机器节点/容器</span></li><li><span>hadoop的逻辑分区：每个key对应的分组数据</span></li><li><span>flink的物理分区：算子的并行子任务所占用的线程</span></li><li><span>flink的逻辑分区：每个key所对应的数据/累加器</span></li><li><span>同一个逻辑分区的数据，一定在同一个物理分区</span></li><li><span>相同key的数据一定在一个节点上处理</span></li><li><span>数据倾斜的本质：某个key所对应的逻辑分区中的数据量过大，导致所在物理分区的资源（CPU、内存、磁盘空间）不够用。</span></li></ul></li><li><p><span>时钟：物理时钟和逻辑时钟的区别是什么</span></p><ul><li><span>flink中的物理时钟：机器时间（真实存在的，CPU产生的时钟信号）</span></li><li><span>flink中的逻辑时钟：水位线（程序员编程产生的时钟）</span></li></ul></li><li><p><span>同步执行和异步执行的区别</span></p><ul><li><p><span>同步：执行顺序是确定的</span></p><ul><li><span>可以通过互斥锁或者事务的方式来保证同步执行。</span></li></ul></li><li><p><span>异步：执行顺序是不确定的</span></p><ul><li><span>多进程</span></li><li><span>多线程</span></li><li><span>单线程异步IO（多路复用IO）</span></li></ul></li></ul></li></ul><h2 id='flink-vs-spark-streaming'><span>Flink vs Spark Streaming</span></h2><ul><li><p><span>流 </span>🆚<span> 微批</span></p></li><li><p><span>事件驱动 </span>🆚<span> 非事件驱动</span></p></li><li><p><span>数据模型</span></p><ul><li><span>Spark：RDD，Spark Streaming的DStream实际上也就是一组组小批数据RDD的集合。</span></li><li><span>Flink基本数据模型是数据流，以及事件（Event）序列（Integer、String、Long、POJO Class、Tuple）</span></li></ul></li></ul><blockquote><p>📝<span>POJO Class用来模拟Scala中的case class。</span></p></blockquote><ul><li><p><span>运行时架构</span></p><ul><li><span>Spark是批计算，将DAG划分为不同的Stage，一个Stage完成后才可以计算下一个Stage。</span></li><li><span>Flink是标准的流执行模式，一个事件在一个节点处理完后可以直接发往下一个节点进行处理。</span></li></ul></li><li><p><span>Spark Streaming的延迟是Flink的1000倍。</span></p></li><li><p><span>Flink支持事件时间和处理时间，Spark Streaming只支持处理时间。</span></p></li><li><p><span>Flink支持会话窗口</span></p></li></ul><h2 id='flink程序的典型结构'><span>Flink程序的典型结构</span></h2><ol start='' ><li><span>获取流执行环境</span></li><li><span>设置算子并行子任务的数量</span></li><li><span>读取数据源</span></li><li><span>进行计算</span></li><li><span>输出</span></li><li><span>提交并执行程序</span></li></ol><h1 id='flink运行时架构'><span>Flink运行时架构</span></h1><h2 id='flink主从架构'><span>Flink主从架构</span></h2><ul><li><p><span>Flink运行时由两种类型的进程组成</span></p><ul><li><span>一个JobManager（作业管理器，Master进程）</span></li><li><span>一个或者多个TaskManager（任务管理器，Slave进程）。</span></li></ul></li><li><p><span>典型的Master-Slave（主从）架构。</span></p></li></ul><figure><img src="figure/23.svg" alt="Flink的主从架构" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-10 Flink的主从架构</figcaption></figure><h2 id='作业管理器'><span>作业管理器</span></h2><p><span>作业管理器是一个JVM进程。进程中包含三类线程：</span></p><ul><li><span>Flink的资源管理器（ResourceManager）：资源是任务管理器中的任务插槽（Task Slot）</span></li><li><span>分发器（WebUI）：提交任务和监控集群和任务</span></li><li><span>JobMaster（作业主线程，每个作业对应一个）：调度任务，将DAG部署到任务管理器</span></li></ul><figure><img src="figure/24.svg" alt="作业管理器的三种线程" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-11 作业管理器的三种线程</figcaption></figure><p><span>JobMaster由于是每个作业对应一个，所以可能有多个JobMaster线程。</span></p><h2 id='任务管理器'><span>任务管理器</span></h2><ul><li><span>任务管理器也是一个JVM进程。包含至少一个任务插槽。</span></li><li><span>任务插槽是Flink的最小计算单元。</span></li><li><span>每个任务插槽是一个内存分片，每个任务插槽占用一段内存。</span></li><li><span>一个任务插槽中至少运行一个线程。</span></li><li><span>任务插槽内存大小 = 任务管理器的JVM堆内存 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="1.296ex" role="img" focusable="false" viewBox="0 -537 778 573" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: -0.081ex;"><defs><path id="MJX-22-TEX-N-F7" d="M318 466Q318 500 339 518T386 537Q418 537 438 517T458 466Q458 438 440 417T388 396Q355 396 337 417T318 466ZM56 237T56 250T70 270H706Q721 262 721 250T706 230H70Q56 237 56 250ZM318 34Q318 68 339 86T386 105Q418 105 438 85T458 34Q458 6 440 -15T388 -36Q355 -36 337 -15T318 34Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="F7" xlink:href="#MJX-22-TEX-N-F7"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>÷</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\div</script><span> 任务插槽数量</span></li></ul><h2 id='任务插槽'><span>任务插槽</span></h2><ul><li><span>不同算子的并行子任务可以共享同一个任务插槽。</span></li><li><span>相同算子的不同并行子任务不能共享同一个任务插槽。</span></li><li><span>算子的并行度是N，那么算子就有N个并行子任务，并且必须占用N个任务插槽。</span></li></ul><p><span>给定以下程序以及6个任务插槽，那么可能的任务调度情形如下图：</span></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="java"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="java"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>5</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation" style=""><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable">source</span>.<span class="cm-variable">setParallelism</span>(<span class="cm-number">2</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">map</span>.<span class="cm-variable">setParallelism</span>(<span class="cm-number">2</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">3</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">keyBy</span>.<span class="cm-variable">window</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">4</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">reduce</span>.<span class="cm-variable">setParallelism</span>(<span class="cm-number">2</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">5</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">sink</span>.<span class="cm-variable">setParallelism</span>(<span class="cm-number">1</span>)</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 173px;"></div><div class="CodeMirror-gutters" style="height: 173px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><figure><img src="figure/tasks_slots.svg" alt="每个任务插槽一个线程的情况" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-12 每个任务插槽一个线程的情况</figcaption></figure><p><span>给定以下程序以及6个任务插槽，可能的调度情形如下图：</span></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="java"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="java"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>5</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation" style=""><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable">source</span>.<span class="cm-variable">setParallelism</span>(<span class="cm-number">6</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">map</span>.<span class="cm-variable">setParallelism</span>(<span class="cm-number">6</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">3</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">keyBy</span>.<span class="cm-variable">window</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">4</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">reduce</span>.<span class="cm-variable">setParallelism</span>(<span class="cm-number">6</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">5</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">sink</span>.<span class="cm-variable">setParallelism</span>(<span class="cm-number">1</span>)</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 173px;"></div><div class="CodeMirror-gutters" style="height: 173px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><figure><img src="figure/slot_sharing.svg" alt="每个任务插槽多个线程的情况" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-13 每个任务插槽多个线程的情况</figcaption></figure><figure><img src="figure/slots_parallelism.svg" alt="任务槽数量和并行度数量的关系" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-14 任务槽数量和并行度数量的关系</figcaption></figure><h2 id='并行度的设置'><span>并行度的设置</span></h2><p><span>并行度指的是算子的并行度。</span></p><p><span>算子并行度的大小不能超过集群可用任务插槽的数量。</span></p><p><span>从上到下，优先级升高。</span></p><ol start='' ><li><span>任务管理器的配置文件里面：</span><code>flink-conf.yaml</code><span>中的配置项</span><code>parallelism.default: 1</code></li><li><span>在命令行或者WebUI提交任务时指定并行度：</span><code>./bin/flink run jar包 -p 16</code></li><li><span>全局并行度：</span><code>env.setParallelism(1)</code></li><li><span>针对算子设置并行度：</span><code>.print().setParallelism(1)</code></li></ol><h2 id='并行度设置的最佳实践'><span>并行度设置的最佳实践</span></h2><ol start='' ><li><span>不要设置全局并行度，因为没办法在命令行做动态扩容。</span></li><li><span>针对某些算子设置并行度，例如数据源，为了不改变数据的顺序，设置数据源的并行度为1。</span></li><li><span>在提交任务时设置，可以动态扩容。</span></li></ol><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="java"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="java"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>5</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation" style=""><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable">env</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">source</span>.<span class="cm-variable">setParallelism</span>(<span class="cm-number">1</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">3</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">keyBy</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">4</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">sum</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">5</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">print</span>.<span class="cm-variable">setParallelism</span>(<span class="cm-number">1</span>)</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 173px;"></div><div class="CodeMirror-gutters" style="height: 173px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><p><span>第一次提交任务</span></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="bash"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="bash"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>1</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation"><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-def">$ flink</span> run jar包 <span class="cm-attribute">-p</span> <span class="cm-number">8</span></span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 35px;"></div><div class="CodeMirror-gutters" style="height: 35px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><p><span>那么</span><code>sum</code><span>算子的并行度是8，</span><code>source</code><span>和</span><code>print</code><span>的并行度是1。</span></p><p><span>动态扩容</span></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="bash"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="bash"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>1</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation"><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-def">$ flink</span> run jar包 <span class="cm-attribute">-p</span> <span class="cm-number">16</span></span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 35px;"></div><div class="CodeMirror-gutters" style="height: 35px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><p><span>那么</span><code>sum</code><span>算子的并行度是16，</span><code>source</code><span>和</span><code>print</code><span>的并行度是1。</span></p><h2 id='任务提交流程'><span>任务提交流程</span></h2><figure><img src="figure/13.svg" alt="Flink任务提交流程" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-15 Flink任务提交流程</figcaption></figure><ul><li><span>在集群启动时，任务管理器会向资源管理器注册自己的任务插槽</span></li><li><span>任务管理器之间存在数据的交换</span></li></ul><h2 id='flink中的dag数据结构'><span>Flink中的DAG数据结构</span></h2><figure><img src="figure/9.svg" alt="Flink中DAG数据结构的转换" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-16 Flink中DAG数据结构的转换</figcaption></figure><ul><li><p><span>StreamGraph：是根据用户通过Stream API编写的代码生成的最初的有向无环图。用来表示程序的拓扑结构。源码：</span><code>StreamGraph.java</code></p></li><li><p><span>JobGraph：StreamGraph在编译的阶段经过优化后生成了JobGraph（源码：</span><code>JobGraph.java</code><span>），JobGraph就是提交给作业管理器的数据结构。主要的优化为，将多个符合条件（没有shuffle，并行度相同）的算子串在一起作为一个</span><span style="color:red"><span>任务链节点</span></span><span>。保证同一个任务链节点里面的所有算子都在同一个任务插槽的同一个线程中执行。这样算子之间的数据就是本地转发（无需序列化反序列化和网络IO）。两个条件：</span></p><ul><li><span>两个算子之间没有shuffle存在</span></li><li><span>两个算子的并行度必须相同</span></li></ul></li><li><p><span>ExecutionGraph：作业管理器根据JobGraph生成ExecutionGraph（源码：</span><code>ExecutionGraph.java</code><span>）。ExecutionGraph是JobGraph的并行化版本，是调度层最核心的数据结构。</span></p></li><li><p><span>物理执行图：JobMaster线程根据ExecutionGraph对作业进行调度后，在各个任务管理器上部署Task后形成的“图”，并不是一个具体的数据结构。</span></p></li></ul><p><span>我们看一下如下伪代码的DAG是如何进行转换的。</span></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="java"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="java"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>5</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation" style=""><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable">source</span>.<span class="cm-variable">setParallelism</span>(<span class="cm-number">1</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">flatMap</span>().<span class="cm-variable">setParallelism</span>(<span class="cm-number">2</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">3</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">keyBy</span>()</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">4</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">reduce</span>().<span class="cm-variable">setParallelism</span>(<span class="cm-number">2</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">5</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">sink</span>().<span class="cm-variable">setParallelism</span>(<span class="cm-number">2</span>)</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 173px;"></div><div class="CodeMirror-gutters" style="height: 173px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><p><span>首先生成的是StreamGraph。</span></p><figure><img src="figure/10.svg" alt="StreamGraph" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-17 StreamGraph</figcaption></figure><p><span>StreamGraph在客户端编译时生成了JobGraph。</span></p><ul><li><span>source和flatMap由于并行度不同，所以无法合并成一个任务链。</span></li><li><span>flatMap和reduce虽然并行度相同，但由于算子之间存在shuffle，所以也无法合并成一个任务链。</span></li><li><span>reduce和sink并行度相同，且不存在shuffle，所以可以合成一个任务链。</span></li></ul><figure><img src="figure/11.svg" alt="JobGraph" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-18 JobGraph</figcaption></figure><p><span>将JobGraph提交到作业管理器，会生成ExecutionGraph，也就是将算子按照并行度拆分成多个并行子任务。</span></p><p><span>ExecutionGraph中的每个顶点都要占用一个线程。所以下图中共有5个顶点，需要5个线程来执行。每个顶点对应了源码中的一个</span><code>Task.java</code><span>的实例。</span></p><figure><img src="figure/12.svg" alt="ExecutionGraph" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-19 ExecutionGraph</figcaption></figure><p><span>上图会占用几个任务插槽？</span></p><p><span>最少需要占用2个任务插槽。</span></p><ul><li><span>第一个任务插槽：</span><code>source[1]</code><span>，</span><code>flatMap[1/2]</code><span>，</span><code>reduce-&gt;sink[1/2]</code></li><li><span>第二个任务插槽：</span><code>flatMap[2/2]</code><span>，</span><code>reduce-&gt;sink[2/2]</code></li></ul><p><span>最多占用5个任务插槽。也就是5个顶点的线程分别占用一个任务插槽。</span></p><p><span>JobMaster将ExecutionGraph部署到任务管理器执行。</span></p><h1 id='datastream-api'><span>DataStream API</span></h1><h2 id='自定义数据源'><span>自定义数据源</span></h2><h3 id='pojo-class'><span>POJO CLASS</span></h3><blockquote><p>📝<span>模拟Scala中的样例类</span></p></blockquote><ul><li><span>必须是公有类</span></li><li><span>所有字段必须是公有字段</span></li><li><span>必须有空构造器</span></li></ul><h3 id='sourcefunction'><span>SourceFunction</span></h3><p><span>SourceFunction</span><span>&lt;</span><span>T&gt;的泛型是数据源中的数据的类型。</span></p><ul><li><p><span>run方法用来发送数据，由“程序启动”事件触发执行。</span></p><ul><li><span>ctx.collect向下游发送数据</span></li><li><span>ctx.collectWithTimestamp向下游发送数据，并指定数据的事件时间</span></li><li><span>ctx.emitWatermark向下游发送水位线</span></li></ul></li><li><p><span>cancel方法在取消任务时执行，由“取消任务”事件触发执行。</span></p></li></ul><blockquote><p>📝<span>编写Flink程序时，要注意泛型和方法的参数类型！</span></p></blockquote><p><span>其他自定义数据源API</span></p><ul><li><span>ParallelSourceFunction</span><span>&lt;</span><span>T&gt;：并行数据源</span></li><li><span>RichSourceFunction</span><span>&lt;</span><span>T&gt;：富函数版本</span></li><li><span>RichParallelSourceFunction</span><span>&lt;</span><span>T&gt;：并行数据源的富函数版本</span></li></ul><h2 id='基本转换算子'><span>基本转换算子</span></h2><blockquote><p>📝<span>代表算子：flatMap</span></p></blockquote><p><span>基本转换算子都是</span><span style="color:red"><span>无状态算子</span></span><span>。</span></p><p><span>举个无状态函数的例子</span></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="c"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="c"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>3</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation"><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable-3">int</span> <span class="cm-def">add</span>(<span class="cm-variable-3">int</span> <span class="cm-variable">n</span>) {</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp;<span class="cm-keyword">return</span> <span class="cm-variable">n</span> <span class="cm-operator">+</span> <span class="cm-number">1</span>;</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">3</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">}</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 104px;"></div><div class="CodeMirror-gutters" style="height: 104px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><p><span>在输入相同的情况下，输出一定相同。</span></p><ul><li><span>MapFunction&lt;IN, OUT&gt;的语义：针对流或者列表中的每一个元素，输出一个元素</span></li><li><span>FlatMapFunction&lt;IN, OUT&gt;的语义：针对流或者列表中的每个元素，输出0个、1个或者多个元素</span></li><li><span>FilterFunction</span><span>&lt;</span><span>IN&gt;的语义：针对流或者列表中的每个元素，输出0个或者1个元素</span></li></ul><p><span>flatMap是map和filter的泛化，也就是说可以使用flatMap来实现map和filter的功能。</span></p><h2 id='逻辑分区算子'><span>逻辑分区算子</span></h2><blockquote><p>📝<span>代表算子：reduce</span></p></blockquote><ul><li><p><span>keyBy的作用：</span></p><ul><li><span>指定数据的key。</span></li><li><span>根据key计算出数据要去下游算子的哪一个并行子任务。</span></li><li><span>将数据路由到下游算子的对应的并行子任务中。</span></li></ul></li><li><p><span>相同key的数据一定会路由到下游算子的同一个并行子任务中。</span></p></li><li><p><span>不同key的数据也可能路由到下游算子的同一个并行子任务中。</span></p></li></ul><blockquote><p><span>keyBy不是一个算子，因为不具备计算功能。keyBy的作用只是为数据指定key，并将数据路由到对应的并行子任务中。</span></p></blockquote><p><span>reduce是</span><span style="color:red"><span>有状态算子</span></span><span>。</span></p><p><span>举个有状态函数的例子。下面的函数将全局变量</span><code>count</code><span>作为内部状态维护。</span></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="c"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="c"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><span><span>​</span>x</span></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation" style=""><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable-3">int</span> <span class="cm-variable">count</span> <span class="cm-operator">=</span> <span class="cm-number">0</span>; &nbsp; &nbsp;<span class="cm-comment">// 初始化全局变量count</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable-3">int</span> <span class="cm-def">add</span>(<span class="cm-variable-3">int</span> <span class="cm-variable">n</span>) {</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">3</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp;<span class="cm-variable">count</span> <span class="cm-operator">+=</span> <span class="cm-variable">n</span>;</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">4</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp;<span class="cm-keyword">return</span> <span class="cm-variable">count</span>;</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">5</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">}</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">6</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span cm-text="" cm-zwsp="">
</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">7</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable">add</span>(<span class="cm-number">1</span>); <span class="cm-comment">// =&gt; 1</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">8</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable">add</span>(<span class="cm-number">1</span>); <span class="cm-comment">// =&gt; 2</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">9</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable">add</span>(<span class="cm-number">1</span>); <span class="cm-comment">// =&gt; 3</span></span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 311px;"></div><div class="CodeMirror-gutters" style="height: 311px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><p><span>输入相同的情况下，输出不一定相同。</span></p><ul><li><span>ReduceFunction</span><span>&lt;</span><span>T&gt;的语义：reduce算子会初始化一个空累加器（类型和流中的元素类型相同），第一条元素到来，直接作为累加器保存，并将累加器输出。第二条以及之后的元素到来，和累加器进行累加操作并更新累加器，然后将累加器输出。reduce方法定义的是输入元素和累加器的累加规则。</span></li><li><span>每个key都会维护自己的累加器，输入数据更新完累加器之后，直接被丢弃。</span></li><li><span>reduce只能在keyBy之后使用。</span></li></ul><h3 id='reduce算子如何维护逻辑分区'><span>reduce算子如何维护逻辑分区</span></h3><figure><img src="figure/15.svg" alt="reduce算子的并行子任务如何维护逻辑分区" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-20 reduce算子的并行子任务如何维护逻辑分区</figcaption></figure><p><span>假设我们有如下Flink代码：</span></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="java"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="java"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 53px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 45px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>13</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation" style=""><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -45px; width: 45px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 36px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable">env</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">fromElements</span>(<span class="cm-number">1</span>,<span class="cm-number">2</span>,<span class="cm-number">3</span>,<span class="cm-number">4</span>,<span class="cm-number">5</span>,<span class="cm-number">6</span>,<span class="cm-number">7</span>,<span class="cm-number">8</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">3</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">setParallelism</span>(<span class="cm-number">1</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">4</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">keyBy</span>(<span class="cm-variable">r</span> <span class="cm-operator">-&gt;</span> <span class="cm-variable">r</span> <span class="cm-operator">%</span> <span class="cm-number">3</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">5</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">reduce</span>(<span class="cm-keyword">new</span> <span class="cm-variable">ReduceFunction</span><span class="cm-operator">&lt;</span><span class="cm-variable-3">Integer</span><span class="cm-operator">&gt;</span>() {</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">6</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp;<span class="cm-meta">@Override</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">7</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp;<span class="cm-keyword">public</span> <span class="cm-variable-3">Integer</span> <span class="cm-variable">reduce</span>(<span class="cm-variable-3">Integer</span> <span class="cm-variable">v1</span>, <span class="cm-variable-3">Integer</span> <span class="cm-variable">v2</span>) <span class="cm-keyword">throws</span> <span class="cm-variable">Exception</span> {</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">8</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp; &nbsp;<span class="cm-keyword">return</span> <span class="cm-variable">v1</span> <span class="cm-operator">+</span> <span class="cm-variable">v2</span>;</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">9</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp;  }</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 36px;">10</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  })</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">11</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">setParallelism</span>(<span class="cm-number">4</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">12</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">print</span>()</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 36px;">13</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">setParallelism</span>(<span class="cm-number">4</span>);</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 449px;"></div><div class="CodeMirror-gutters" style="height: 449px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 44px;"></div></div></div></div></pre><p><span>那么keyBy对数据的路由方式可能是：</span></p><ul><li><span>1,4,7路由到reduce的第三个并行子任务</span></li><li><span>3,6路由到reduce的第三个并行子任务</span></li><li><span>2,5,8路由到reduce的第四个并行子任务</span></li></ul><figure><img src="figure/14.svg" alt="keyBy可能的一种路由方式" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-21 keyBy可能的一种路由方式</figcaption></figure><h2 id='物理分区算子'><span>物理分区算子</span></h2><p><span>将数据分发到不同的并行子任务。</span></p><ul><li><span>shuffle()：随机向下游的并行子任务发送数据。</span></li><li><span>rebalance()：将数据轮询发送到下游的</span><span style="color:red"><span>所有</span></span><span>并行子任务中。</span></li><li><span>rescale()：将数据轮询发送到下游的</span><span style="color:red"><span>部分</span></span><span>并行子任务中。用在下游算子的并行度是上游算子的并行度的整数倍的情况。</span></li><li><span>broadcast()：将数据广播到下游的所有并行子任务中。</span></li><li><span>global()：将数据发送到下游的第一个（索引为0）并行子任务中。</span></li><li><span>custom()：自定义分区。可以自定义将某个key的数据发送到下游的哪一个并行子任务中去。</span></li></ul><h2 id='富函数'><span>富函数</span></h2><p><span>生命周期（对象的生命周期，html页面的生命周期，锁的生命周期，数据库连接的生命周期，...）</span></p><p><span>算子的每一个并行子任务都有自己的生命周期。</span></p><ul><li><span>open方法：在算子的计算逻辑执行前执行一次，适合做一些初始化的工作（打开一个文件，打开一个网络连接，打开一个数据库的连接）</span></li><li><span>close方法：在算子的计算逻辑执行完毕之后执行一次，适合做一些清理工作。（关闭一个文件，关闭网络连接，关闭数据库连接）</span></li><li><span>getRuntimeContext()方法：用来获取算子运行时的一些上下文信息。比如当前算子所处的并行子任务的索引等等。</span></li></ul><p><span>举一些例子</span></p><ul><li><span>MapFunction </span>➡️<span> RichMapFunction</span></li><li><span>FilterFunction </span>➡️<span> RichFilterFunction</span></li><li><span>FlatMapFunction </span>➡️<span> RichFlatMapFunction</span></li><li><span>ReduceFunction </span>➡️<span> RichReduceFunction</span></li><li><span>SourceFunction </span>➡️<span> RichSourceFunction</span></li><li><span>SinkFunction </span>➡️<span> RichSinkFunction</span></li></ul><h2 id='自定义输出'><span>自定义输出</span></h2><p><span>SinkFunction</span><span>&lt;</span><span>T&gt;：泛型是要输出的数据的泛型</span></p><p><span>RichSinkFunction</span><span>&lt;</span><span>T&gt;</span></p><h1 id='底层api'><span>底层API</span></h1><p><span>底层API（处理函数）都是</span><span style="color:red"><span>富函数</span></span><span>。</span></p><h2 id='processfunction'><span>ProcessFunction</span></h2><p><span>针对没有keyBy的数据流，可以使用ProcessFunction接口，针对流中的每个元素输出0个、1个或者多个元素。（加强版RichFlatMapFunction）</span></p><ul><li><span>ProcessFunction&lt;IN, OUT&gt;：IN是输入的泛型，OUT是输出的泛型</span></li><li><span>processElement：每来一条数据，触发一次调用。</span></li><li><span>使用.process(new ProcessFunction&lt;I, O&gt;)来调用。</span></li></ul><h2 id='keyedprocessfunction'><span>KeyedProcessFunction</span></h2><p><span>针对keyBy之后的键控流（KeyedStream），可以使用KeyedProcessFunction</span></p><ul><li><span>KeyedProcessFunction&lt;KEY, IN, OUT&gt;：KEY是key的泛型，IN是输入的泛型，OUT是输出的泛型。</span></li><li><span>processElement：来一条数据，触发调用一次。</span></li><li><span>onTimer：定时器。时间到达某一个时间戳触发调用。</span></li></ul><figure><img src="figure/16.svg" alt="KeyedProcessFunction的并行子任务如何维护定时器" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-22 KeyedProcessFunction的并行子任务如何维护定时器</figcaption></figure><ul><li><p><span>每个key都会维护自己的定时器，每个key都只能访问自己的定时器。就好像每个key都只能访问自己的累加器一样。</span></p></li><li><p><span>针对每个key，在某个时间戳只能注册一个定时器，定时器不能重复注册，如果某个时间戳已经注册了定时器，那么再对这个时间戳注册定时器就不起作用了。</span></p></li><li><p><span>.registerProcessingTimeTimer(ts)：在机器时间戳ts注册了一个定时器（onTimer）。</span></p></li><li><p><span>维护的内部状态</span></p><ul><li><span>状态变量</span></li><li><span>定时器</span></li></ul></li><li><p><span>processElement方法和onTimer方法：这两个方法是</span><span style="color:red"><span>原子性</span></span><span>的，无法并发执行。某个时刻只能执行一个方法。因为这两个方法都有可能操作相同的状态变量。例如：到达了一个事件，此时onTimer正在执行，则必须等待onTimer执行完以后，再调用processElement。再比如：到达了一个水位线，想触发onTimer，但此时processElement正在执行，那么必须等待processElement执行完以后再执行onTimer。</span></p></li><li><p><span>当水位线到达KeyedProcessFunction，如果这条水位线触发了onTimer的执行，则必须等待onTimer执行完以后，水位线才能向下游发送。</span></p></li><li><p><span>当水位线到达ProcessWindowFunction，如果这条水位线触发了process方法的执行，则必须等待process方法执行完以后，水位线才能向下游发送。</span></p></li></ul><blockquote><p>📝<span>在KeyedProcessFunction中，可以认为维护了多张HashMap，每个状态变量的定义都会初始化一张HashMap，同时还有一张维护每个key的定时器队列的HashMap。</span></p></blockquote><h2 id='逻辑分区维护的状态-键控状态变量'><span>逻辑分区维护的状态-键控状态变量</span></h2><p><span>每个key都会维护自己的状态变量</span></p><ul><li><span>ValueState：类似Java的普通变量一样使用。</span></li><li><span>ListState：类似Java的ArrayList一样使用。</span></li><li><span>MapState：类似Java的HashMap一样使用。</span></li></ul><h3 id='valuestate-值状态变量'><span>ValueState-值状态变量</span></h3><ul><li><span>每个key都只能访问自己的状态变量，状态变量是每个key独有的。</span></li><li><span>状态变量是</span><span style="color:red"><span>单例</span></span><span>，只能被初始化一次。</span></li><li><span>状态变量会每隔一段时间作为检查点保存到状态后端（例如HDFS）。</span></li><li><span>当Flink程序启动时，会先去状态后端（例如HDFS）寻找状态变量，如果找不到，则初始化。如果找到了，则直接读取。符合单例特性。为什么Flink程序启动的时候，先去状态后端寻找状态变量呢？因为Flink不知道程序是第一次启动，还是故障恢复后的启动。如果是故障恢复，则要去保存的检查点里寻找状态变量，恢复到最近一次检查点的状态。</span></li><li><span>.getState方法通过状态描述符去状态后端寻找状态变量</span></li><li><span>读取值状态变量中的值：.value()方法</span></li><li><span>将值写入状态变量：.update()方法</span></li><li><span>如何清空状态变量：.clear()方法</span></li></ul><figure><img src="figure/17.svg" alt="KeyedProcessFunction的并行子任务如何维护ValueState" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-23 KeyedProcessFunction的并行子任务如何维护ValueState</figcaption></figure><h3 id='liststate-列表状态变量'><span>ListState-列表状态变量</span></h3><ul><li><span>.get()方法：返回包含列表状态变量中所有元素的迭代器</span></li><li><span>.clear()方法：清空状态变量</span></li><li><span>.add()方法：添加元素</span></li></ul><figure><img src="figure/19.svg" alt="KeyedProcessFunction的并行子任务如何维护ListState" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-24 KeyedProcessFunction的并行子任务如何维护ListState</figcaption></figure><h3 id='mapstate-字典状态变量'><span>MapState-字典状态变量</span></h3><ul><li><span>.put(KEY, VALUE)方法：添加KEY </span>➡️<span> VALUE键值对</span></li><li><span>.get(KEY)方法：获取KEY的VALUE</span></li><li><span>.contains(KEY)方法：检测KEY是否存在</span></li><li><span>.keys()：返回所有KEY组成的集合</span></li></ul><figure><img src="figure/18.svg" alt="KeyedProcessFunction的并行子任务如何维护MapState" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-25 KeyedProcessFunction的并行子任务如何维护MapState</figcaption></figure><h2 id='窗口api'><span>窗口API</span></h2><h3 id='processwindowfunction'><span>ProcessWindowFunction</span></h3><p><span>使用在</span><code>stream.keyBy().window()</code><span>之后的流</span></p><ul><li><p><span>ProcessWindowFunction&lt;IN, OUT, KEY, WINDOW&gt;</span></p></li><li><p><span>process方法：窗口闭合的时候触发调用</span></p></li><li><p><span>Flink中的窗口是左闭右开：[0,10)，当时间到达9999ms的时候，触发process函数的调用。</span></p></li><li><p><span>窗口从1970-01-01 00:00:00开始对齐</span></p></li><li><p><span>滚动窗口的计算公式，开了一个5秒钟的滚动窗口，7秒钟到达的事件属于哪个窗口？</span></p></li><li><p><span>窗口开始时间 = 时间戳 - 时间戳 % 窗口大小</span></p><ul><li><span>windowStartTime = 1234ms - 1234 % 5000 = 0ms</span></li><li><span>windowStartTime = 7000ms - 7000 % 5000 = 5000ms</span></li></ul></li><li><p><span>属于某个窗口的第一条数据到达以后才会开窗口</span></p></li><li><p><span>窗口内部状态：</span></p><ul><li><span>属于窗口的所有事件</span></li><li><span>定时器：时间戳=窗口结束时间 - 1毫秒（因为是左闭右开区间），方法是process函数</span></li></ul></li></ul><blockquote><p>⚠️<span>在只使用ProcessWindowFunction的情况下，process方法的迭代器参数包含了属于窗口的所有数据，会对内存造成压力，那么应该怎么去优化呢？使用累加器的思想。</span></p></blockquote><h3 id='aggregatefunction'><span>AggregateFunction</span></h3><p><span>增量聚合函数，关键思想是在每个窗口中维护一个累加器。</span></p><ul><li><span>AggregateFunction&lt;IN, ACC, OUT&gt;</span></li><li><span>createAccumulator：创建空累加器，返回值的泛型是累加器的泛型</span></li><li><span>add：定义输入数据和累加器的聚合规则，返回值是聚合后的累加器</span></li><li><span>getResult：窗口闭合时发出聚合结果，返回值是将要发送的聚合结果</span></li><li><span>merge：一般不需要实现。只用在</span><span style="color:red"><span>事件时间会话窗口</span></span><span>的情况。</span></li></ul><h3 id='将aggregatefunction和processwindowfunction结合使用'><span>将AggregateFunction和ProcessWindowFunction结合使用</span></h3><p><span>当窗口闭合时：AggregateFunction将getResult()方法的返回值，发送给了ProcessWindowFunction。</span></p><figure><img src="figure/25.svg" alt="AggregateFunction将getResult的结果发送给ProcessWindowFunction" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-26 AggregateFunction将getResult的结果发送给ProcessWindowFunction</figcaption></figure><p><span>ProcessWindowFunction的process方法的迭代器参数中只有一个元素。</span></p><h3 id='窗口的底层实现'><span>窗口的底层实现</span></h3><p><strong><span>AggregateFunction和ProcessWindowFunction结合使用的底层实现</span></strong></p><figure><img src="figure/26.svg" alt="AggregateFunction和ProcessWindowFunction" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-27 AggregateFunction和ProcessWindowFunction</figcaption></figure><p><strong><span>只使用ProcessWindowFunction的底层实现</span></strong></p><figure><img src="figure/27.svg" alt="只使用ProcessWindowFunction" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-28 只使用ProcessWindowFunction</figcaption></figure><p><span>窗口中的所有元素都保存在List中。</span></p><h3 id='processallwindowfunction'><span>ProcessAllWindowFunction</span></h3><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="java"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="java"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>3</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation"><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable">stream</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">windowAll</span>()</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">3</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">process</span>(<span class="cm-keyword">new</span> <span class="cm-variable">ProcessAllWindowFunction</span>())</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 104px;"></div><div class="CodeMirror-gutters" style="height: 104px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><p><span>直接对流进行开窗口，等价于将所有数据keyBy到同一条流，然后进行开窗口。</span></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="java"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="java"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>4</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation"><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable">stream</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">keyBy</span>(<span class="cm-variable">r</span> <span class="cm-operator">-&gt;</span> <span class="cm-number">1</span>)</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">3</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">window</span>()</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">4</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">process</span>(<span class="cm-keyword">new</span> <span class="cm-variable">ProcessWindowFunction</span>())</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 138px;"></div><div class="CodeMirror-gutters" style="height: 138px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><h3 id='触发器'><span>触发器</span></h3><blockquote><p><span>窗口大小是1天，但想每隔10秒钟计算一次窗口中的统计指标，怎么办？</span></p></blockquote><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="java"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="java"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>5</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation" style=""><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-variable">stream</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">keyBy</span>()</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">3</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">window</span>()</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 23px;">4</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">trigger</span>()</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">5</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">  .<span class="cm-variable">process</span>()<span class="cm-operator">/</span><span class="cm-variable">aggregate</span>()</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 173px;"></div><div class="CodeMirror-gutters" style="height: 173px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><p><span>触发器触发的是trigger后面的process/aggregate方法。</span></p><p><span>Trigger&lt;T, W&gt;：T是窗口中的元素类型，W是窗口类型。</span></p><p><span>TriggerResult</span></p><ul><li><span>CONTINUE：什么都不做</span></li><li><span>FIRE：触发窗口的计算</span></li><li><span>PURGE：清空窗口中的元素</span></li><li><span>FIRE_AND_PURGE：触发窗口计算并清空窗口中的元素</span></li></ul><p><span>Trigger中的核心方法</span></p><ul><li><span>onElement：每来一条数据调用一次</span></li><li><span>onProcessingTime：处理时间定时器</span></li><li><span>onEventTime：事件时间定时器</span></li><li><span>clear：窗口闭合时触发调用</span></li></ul><h1 id='窗口'><span>窗口</span></h1><h2 id='窗口概念'><span>窗口概念</span></h2><ul><li><span>一般真实的流都是无界的，怎样处理无界的数据？</span></li><li><span>可以把无限的数据流进行切分，得到有限的数据集进行处理—也就是得到有界流</span></li><li><span>窗口（Window）就是将无限流切割为有限流的一种方式，它会将流数据分发到有限大小的桶（bucket）中进行分析</span></li></ul><h2 id='窗口的本质'><span>窗口的本质</span></h2><p><span>我们一般在.keyBy之后使用.window方法来进行开窗，实际上是在keyBy之后的逻辑分区中，再按照窗口进行一次逻辑分区。先分流再开窗。</span></p><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="sql"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="sql"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>1</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation"><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-keyword">SELECT</span> <span class="cm-operator">*</span> <span class="cm-keyword">FROM</span> <span class="cm-keyword">table</span> <span class="cm-keyword">GROUP</span> <span class="cm-keyword">BY</span> <span class="cm-keyword">key</span><span class="cm-punctuation">,</span> window<span class="cm-punctuation">;</span></span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 35px;"></div><div class="CodeMirror-gutters" style="height: 35px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><p><span>Flink窗口是左闭右开的区间，例如[0, 5)的窗口最后一个时间戳是4999毫秒。</span></p><h2 id='窗口类型'><span>窗口类型</span></h2><ul><li><p><span>时间窗口（Time Window）</span></p><ul><li><span>滚动时间窗口</span></li><li><span>滑动时间窗口</span></li><li><span>会话窗口</span></li></ul></li><li><p><span>计数窗口（Count Window）</span></p><ul><li><span>滚动计数窗口</span></li><li><span>滑动计数窗口</span></li></ul></li></ul><h2 id='滚动窗口'><span>滚动窗口</span></h2><ul><li><span>将数据依据固定的窗口长度对数据进行切分</span></li><li><span>时间对齐，窗口长度固定，没有重叠</span></li></ul><figure><img src="figure/28.svg" alt="滚动窗口" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-29 滚动窗口</figcaption></figure><h2 id='滑动窗口'><span>滑动窗口</span></h2><figure><img src="figure/29.svg" alt="滑动窗口" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-30 滑动窗口</figcaption></figure><ul><li><span>滑动窗口是固定窗口的更广义的一种形式，滑动窗口由固定的窗口长度和滑动间隔组成</span></li><li><span>窗口长度固定，可以有重叠</span></li><li><span>如果一个元素同时属于两个窗口，那么会将元素复制两份，每个窗口分配一个元素。这样就可以保证所有的窗口从逻辑上分开处理。</span></li></ul><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="js" style="break-inside: unset;"><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="js"><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 53px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 45px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>16</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation" style=""><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -45px; width: 45px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 36px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">{</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">2</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp;<span class="cm-string">"Mary"</span>: <span class="cm-variable">MapState</span> {</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">3</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp; &nbsp;  (<span class="cm-number">0</span>,<span class="cm-number">10</span>): [<span class="cm-meta">...</span>,<span class="cm-variable">Event</span>(<span class="cm-number">7</span><span class="cm-variable">s</span>)],</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">4</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp; &nbsp;  (<span class="cm-number">5</span>,<span class="cm-number">15</span>): [<span class="cm-meta">...</span>,<span class="cm-variable">Event</span>(<span class="cm-number">7</span><span class="cm-variable">s</span>)],</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">5</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp; &nbsp;  (<span class="cm-number">10</span>,<span class="cm-number">20</span>): [<span class="cm-meta">...</span>],</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">6</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp; &nbsp; &nbsp;<span class="cm-meta">...</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">7</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp; &nbsp; &nbsp;<span class="cm-meta">...</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">8</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp;  },</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">9</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp;<span class="cm-string">"Bob"</span>: <span class="cm-variable">MapState</span> {</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 36px;">10</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp; &nbsp;  (<span class="cm-number">0</span>,<span class="cm-number">10</span>): [<span class="cm-meta">...</span>,<span class="cm-variable">Event</span>(<span class="cm-number">7</span><span class="cm-variable">s</span>)],</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">11</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp; &nbsp;  (<span class="cm-number">5</span>,<span class="cm-number">15</span>): [<span class="cm-meta">...</span>,<span class="cm-variable">Event</span>(<span class="cm-number">7</span><span class="cm-variable">s</span>)],</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">12</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp; &nbsp;  (<span class="cm-number">10</span>,<span class="cm-number">20</span>): [<span class="cm-meta">...</span>],</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">13</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp; &nbsp; &nbsp;<span class="cm-meta">...</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">14</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp; &nbsp; &nbsp; &nbsp;<span class="cm-meta">...</span></span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt" style="left: 0px; width: 36px;">15</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"> &nbsp;  },</span></pre></div><div style="position: relative;"><div class="CodeMirror-gutter-wrapper" style="left: -45px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 36px;">16</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;">}</span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 553px;"></div><div class="CodeMirror-gutters" style="height: 553px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 44px;"></div></div></div></div></pre><h2 id='会话窗口'><span>会话窗口</span></h2><ul><li><span>会话窗口的长度是</span><span style="color:red"><span>不固定</span></span><span>的。</span></li><li><span>使用超时时间来定义会话窗口。</span></li><li><span>会话窗口开始时间：窗口中第一个元素的时间戳</span></li><li><span>会话窗口结束时间：窗口中最后一个元素的时间戳+超时时间</span></li></ul><figure><img src="figure/30.svg" alt="会话窗口" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-31 会话窗口</figcaption></figure><h1 id='逻辑时钟-水位线'><span>逻辑时钟-水位线</span></h1><p><span>两种时间语义</span></p><figure><img src="figure/event_processing_time.svg" alt="两种时间语义" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-32 两种时间语义</figcaption></figure><ul><li><span>处理时间：事件进入process算子的机器时间。调用</span><code>ctx.timerService().currentProcessingTime()</code><span>时的机器时间。</span></li><li><span>事件时间：事件中包含的时间戳，事件真实发生的时间。</span></li></ul><p><span>举两个例子：</span></p><figure><img src="figure/31.svg" alt="星球大战" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-33 星球大战</figcaption></figure><figure><img src="figure/34.svg" alt="打游戏" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-34 打游戏</figcaption></figure><p><span style="color:red"><span>由于事件时间的世界里面没有时钟</span></span><span>，所以我们需要为事件时间的世界提供时钟，叫做水位线（逻辑时钟）。</span></p><p><span>时钟的定义：单调递增的序列。</span></p><p><span>如果单调递增的序列是CPU产生的，那么就是物理时钟。如果是编程产生的，那么就是逻辑时钟。</span></p><p><span>水位线流动到哪一个算子，哪一个算子就会更新自己的时钟（产线工人的手表）。水位线不会弯道超车，也就是说，水位线不会绕过水位线前面的数据传播下去。如果某个算子的计算过程很耗时，那么会阻塞水位线的向下传播。如果上游的算子阻塞了水位线的传播，那么上游算子和下游算子的水位线（时钟）可能是不一样的。</span></p><p><span>在使用处理时间（ProcessingTime）的情况下，不存在乱序数据的情况。</span></p><p><span>乱序数据只存在于使用事件时间的情况下。</span></p><p><span>时钟：</span></p><ul><li><span>物理时钟：机器时间，墙上时钟</span></li><li><span>逻辑时钟：水位线</span>
<span>在Flink里面，时钟的作用是什么？</span></li><li><span>触发定时器</span></li><li><span>关闭窗口</span></li></ul><h2 id='有关水位线的一些约定'><span>有关水位线的一些约定</span></h2><ol start='' ><li><p><span>水位线是事件时间的世界中的逻辑时钟。</span></p></li><li><p><span>Flink认为携带时间戳 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="1.751ex" role="img" focusable="false" viewBox="0 -636 778 774" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: -0.312ex;"><defs><path id="MJX-28-TEX-N-2264" d="M674 636Q682 636 688 630T694 615T687 601Q686 600 417 472L151 346L399 228Q687 92 691 87Q694 81 694 76Q694 58 676 56H670L382 192Q92 329 90 331Q83 336 83 348Q84 359 96 365Q104 369 382 500T665 634Q669 636 674 636ZM84 -118Q84 -108 99 -98H678Q694 -104 694 -118Q694 -130 679 -138H98Q84 -131 84 -118Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="2264" xlink:href="#MJX-28-TEX-N-2264"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>≤</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\le</script><span> 水位线的事件都已经到达。</span></p></li><li><p><span>水位线是一种</span><span style="color:red"><span>特殊的事件</span></span><span>。由程序员编程（调用</span><code>assignTimestampsAndWatermarks</code><span>）插入到数据流中。随数据流流动。</span></p></li><li><p><span>算子的每个并行子任务会维护自己的水位线（逻辑时钟），当接收到上游发送过来的水位线时，更新自己的逻辑时钟。</span></p></li><li><p><span>水位线 = 观察到的最大事件时间戳 - 最大延迟时间 - 1 毫秒</span></p><ul><li><span>最大延迟时间需要程序员根据经验自己设置</span></li></ul></li><li><p><span>Flink会在流的最开始插入一个时间戳为负无穷大的水位线</span></p></li><li><p><span>Flink会在流的最末尾插入一个时间戳为正无穷大的水位线</span></p></li><li><p><span>水位线 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="1.751ex" role="img" focusable="false" viewBox="0 -636 778 774" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: -0.312ex;"><defs><path id="MJX-27-TEX-N-2265" d="M83 616Q83 624 89 630T99 636Q107 636 253 568T543 431T687 361Q694 356 694 346T687 331Q685 329 395 192L107 56H101Q83 58 83 76Q83 77 83 79Q82 86 98 95Q117 105 248 167Q326 204 378 228L626 346L360 472Q291 505 200 548Q112 589 98 597T83 616ZM84 -118Q84 -108 99 -98H678Q694 -104 694 -118Q694 -130 679 -138H98Q84 -131 84 -118Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="2265" xlink:href="#MJX-27-TEX-N-2265"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>≥</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\ge</script><span> 窗口结束时间，触发窗口计算。</span></p></li><li><p><span>水位线 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="1.751ex" role="img" focusable="false" viewBox="0 -636 778 774" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: -0.312ex;"><defs><path id="MJX-27-TEX-N-2265" d="M83 616Q83 624 89 630T99 636Q107 636 253 568T543 431T687 361Q694 356 694 346T687 331Q685 329 395 192L107 56H101Q83 58 83 76Q83 77 83 79Q82 86 98 95Q117 105 248 167Q326 204 378 228L626 346L360 472Q291 505 200 548Q112 589 98 597T83 616ZM84 -118Q84 -108 99 -98H678Q694 -104 694 -118Q694 -130 679 -138H98Q84 -131 84 -118Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="2265" xlink:href="#MJX-27-TEX-N-2265"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>≥</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\ge</script><span> 定时器时间戳，触发定时器执行。</span></p></li></ol><h2 id='多流转换时水位线的传播机制'><span>多流转换时水位线的传播机制</span></h2><figure><img src="figure/32.svg" alt="多流转换水位线传播机制" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-35 多流转换水位线传播机制</figcaption></figure><ul><li><p><span>分流：水位线复制然后广播到下游的所有并行子任务中去。</span></p></li><li><p><span>合流：</span></p><ul><li><span>第一步：到达的水位线覆盖数组中对应的水位线。</span></li><li><span>第二步：选择水位线数组中的最小的水位线来更新并行子任务的逻辑时钟。</span></li></ul></li></ul><h2 id='水位线设置的最佳实践'><span>水位线设置的最佳实践</span></h2><ul><li><span>在尽量靠近数据源的地方插入水位线（assignTimestampsAndWatermarks）</span></li><li><span>assignTimestampsAndWatermarks前面的算子的并行度最好是1</span></li></ul><h2 id='kafka水位线设置'><span>Kafka水位线设置</span></h2><p><span>数据源的并行度等于Kafka主题的分区数量。</span></p><p><span>插入水位线之前的所有算子的并行度和Kafka主题的分区数量一致。</span></p><h2 id='如何处理迟到数据'><span>如何处理迟到数据</span></h2><p><span>到达的数据的时间戳小于当前算子的水位线，就是迟到数据。</span><span style="color:red"><span>只有在使用事件时间的情况下，才会存在迟到元素</span></span><span>。对于处理时间来说，不存在迟到数据。</span></p><p><span>处理迟到数据的策略：</span></p><ul><li><p><span>默认策略</span></p><ul><li><span>如果来的数据是迟到数据，但是所属窗口还在，那么数据可以进入窗口。</span></li><li><span>如果来的数据是迟到数据，但所属窗口已经销毁，那么数据被丢弃。</span></li></ul></li><li><p><span>将迟到数据发送到侧输出流中去。侧输出流是不同于主流输出的旁路输出流，可以向侧输出流发送任意数据。</span></p><ul><li><code>new OutputTag&lt;T&gt;(&quot;output-tag&quot;){}</code><span>：单例</span></li></ul></li><li><p><span>使用迟到元素更新窗口计算结果。也就是当水位线到达窗口结束时间的时候，触发窗口计算，但不销毁窗口，而是选择再等待迟到元素一段时间。</span></p><ul><li><code>.allowedLateness(Time.seconds(5))</code><span>：窗口会等待5秒钟的迟到事件</span></li><li><span>窗口真正销毁：水位线 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="1.751ex" role="img" focusable="false" viewBox="0 -636 778 774" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: -0.312ex;"><defs><path id="MJX-27-TEX-N-2265" d="M83 616Q83 624 89 630T99 636Q107 636 253 568T543 431T687 361Q694 356 694 346T687 331Q685 329 395 192L107 56H101Q83 58 83 76Q83 77 83 79Q82 86 98 95Q117 105 248 167Q326 204 378 228L626 346L360 472Q291 505 200 548Q112 589 98 597T83 616ZM84 -118Q84 -108 99 -98H678Q694 -104 694 -118Q694 -130 679 -138H98Q84 -131 84 -118Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="2265" xlink:href="#MJX-27-TEX-N-2265"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>≥</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\ge</script><span> 窗口结束时间 + allowedLateness</span></li><li><span>窗口的第一次触发计算：水位线 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="1.751ex" role="img" focusable="false" viewBox="0 -636 778 774" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: -0.312ex;"><defs><path id="MJX-27-TEX-N-2265" d="M83 616Q83 624 89 630T99 636Q107 636 253 568T543 431T687 361Q694 356 694 346T687 331Q685 329 395 192L107 56H101Q83 58 83 76Q83 77 83 79Q82 86 98 95Q117 105 248 167Q326 204 378 228L626 346L360 472Q291 505 200 548Q112 589 98 597T83 616ZM84 -118Q84 -108 99 -98H678Q694 -104 694 -118Q694 -130 679 -138H98Q84 -131 84 -118Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="2265" xlink:href="#MJX-27-TEX-N-2265"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>≥</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\ge</script><span> 窗口结束时间，触发计算完以后窗口不会被销毁</span></li></ul></li></ul><p><strong><span>处理迟到数据的作用</span></strong><span>：将迟到数据保存下来，然后统计一下每天有多少迟到数据，然后方便我们调整最大延迟时间的设置，改进计算结果的正确性。</span></p><h1 id='多流合并'><span>多流合并</span></h1><p><span>Flink的多流合并的机制是以</span><span style="color:red"><span>FIFO</span></span><span>的方式合并多条流。</span></p><ul><li><p><span>union</span></p><ul><li><span>多条流的元素类型必须一样</span></li><li><span>可以合并多条流：</span><code>stream1.union(stream2, stream3)</code></li></ul></li><li><p><span>connect</span></p><ul><li><p><span>只能合并两条流</span></p></li><li><p><span>两条流的元素的类型可以不一样</span></p></li><li><p><span>DataStream API</span></p><ul><li><p><span>CoMapFunction&lt;IN1, IN2, OUT&gt;</span></p><ul><li><span>map1</span></li><li><span>map2</span></li></ul></li><li><p><span>CoFlatMapFunction&lt;IN1, IN2, OUT&gt;</span></p><ul><li><span>flatMap1：来自第一条流的事件进入CoFlatMapFunction，触发调用。</span></li><li><span>flatMap2：来自第二条流的事件进入CoFlatMapFunction，触发调用。</span></li></ul></li></ul></li><li><p><span>底层API</span></p><ul><li><p><span>CoProcessFunction&lt;IN1, IN2, OUT&gt;</span></p><ul><li><span>processElement1</span></li><li><span>processElement2</span></li><li><span>onTimer</span></li><li><span>状态变量</span></li></ul></li></ul></li><li><p><span>应用场景</span></p><ul><li><span>一条流进行keyBy，另一条流broadcast，使的所有的逻辑分区都能和同一条流进行JOIN。</span></li><li><span>两条流都进行keyBy，将来自两条流的相同key的数据合并在一起处理，也就是说，将来自两条流的相同key的数据放在一个逻辑分区中做处理。</span></li></ul></li></ul></li></ul><pre class="md-fences md-end-block md-fences-with-lineno ty-contain-cm modeLoaded" spellcheck="false" lang="sql            "><div class="CodeMirror cm-s-inner cm-s-null-scroll CodeMirror-wrap" lang="sql            "><div style="overflow: hidden; position: relative; width: 3px; height: 0px; top: 15.2734px; left: 40px;"><textarea autocorrect="off" autocapitalize="off" spellcheck="false" tabindex="0" style="position: absolute; bottom: -1em; padding: 0px; width: 1000px; height: 1em; outline: none;"></textarea></div><div class="CodeMirror-scrollbar-filler" cm-not-content="true"></div><div class="CodeMirror-gutter-filler" cm-not-content="true"></div><div class="CodeMirror-scroll" tabindex="-1"><div class="CodeMirror-sizer" style="margin-left: 32px; margin-bottom: 0px; border-right-width: 0px; padding-right: 0px; padding-bottom: 0px;"><div style="position: relative; top: 0px;"><div class="CodeMirror-lines" role="presentation"><div role="presentation" style="position: relative; outline: none;"><div class="CodeMirror-measure"><pre><span>xxxxxxxxxx</span></pre><div class="CodeMirror-linenumber CodeMirror-gutter-elt"><div>1</div></div></div><div class="CodeMirror-measure"></div><div style="position: relative; z-index: 1;"></div><div class="CodeMirror-code" role="presentation"><div class="CodeMirror-activeline" style="position: relative;"><div class="CodeMirror-activeline-background CodeMirror-linebackground"></div><div class="CodeMirror-gutter-background CodeMirror-activeline-gutter" style="left: -32px; width: 32px;"></div><div class="CodeMirror-gutter-wrapper CodeMirror-activeline-gutter" style="left: -32px;"><div class="CodeMirror-linenumber CodeMirror-gutter-elt CodeMirror-linenumber-show" style="left: 0px; width: 23px;">1</div></div><pre class=" CodeMirror-line " role="presentation"><span role="presentation" style="padding-right: 0.1px;"><span class="cm-keyword">SELECT</span> <span class="cm-operator">*</span> <span class="cm-keyword">FROM</span> A <span class="cm-keyword">JOIN</span> B <span class="cm-keyword">ON</span> A<span class="cm-variable-2">.key</span><span class="cm-operator">=</span>B<span class="cm-variable-2">.key</span><span class="cm-punctuation">;</span></span></pre></div></div></div></div></div></div><div style="position: absolute; height: 0px; width: 1px; border-bottom: 0px solid transparent; top: 35px;"></div><div class="CodeMirror-gutters" style="height: 35px;"><div class="CodeMirror-gutter CodeMirror-linenumbers" style="width: 31px;"></div></div></div></div></pre><h1 id='flink中的状态变量'><span>Flink中的状态变量</span></h1><ul><li><p><span>算子状态（Operator State）</span></p><ul><li><span>算子状态的作用范围限定为算子的并行子任务，由同一并行子任务所处理的所有数据都可以访问到相同的状态。</span></li><li><span>算子状态对于同一并行子任务而言是共享的。</span></li><li><span>算子状态不能由相同或不同算子的另一个并行子任务访问。</span></li></ul></li><li><p><span>键控状态（Keyed State）</span></p><ul><li><span>键控状态是根据输入数据流中定义的key来维护和访问的。</span></li><li><span>Flink为每个key维护一个状态实例，并将具有相同key的所有数据，都分区到算子的同一个并行子任务中，这个任务会维护和处理这个key对应的状态。</span></li><li><span>当任务处理一条数据时，它会自动将状态的访问范围限定为当前数据的key。</span></li><li><span>键控状态的底层维护方式是在并行子任务中维护一张HashMap。</span></li></ul></li><li><p><span>窗口状态（Windowed State）：作用域是每个窗口。</span></p></li></ul><figure><img src="figure/33.svg" alt="各种状态变量的作用域示意图" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-36 各种状态变量的作用域示意图</figcaption></figure><h2 id='flink中的状态管理'><span>Flink中的状态管理</span></h2><h3 id='状态后端'><span>状态后端</span></h3><ul><li><p><span>每传入一条数据，有状态的算子任务都会读取和更新状态</span></p></li><li><p><span>由于有效的状态访问对于处理数据的低延迟至关重要，因此每个并行子任务都会在本地（JVM的堆内存）维护其状态，以确保快速的状态访问</span></p></li><li><p><span>状态的存储、访问以及维护，由一个可插入的组件决定，这个组件就叫做状态后端（state backend）</span></p></li><li><p><span>状态后端主要负责两件事</span></p><ul><li><span>本地的状态管理</span></li><li><span>将状态写入远程存储（HDFS、RocksDB、文件系统之类的）的检查点文件中。</span></li></ul></li></ul><h3 id='选择一个状态后端'><span>选择一个状态后端</span></h3><ul><li><p><span>MemoryStateBackend（默认）</span></p><ul><li><span>内存级的状态后端，会将状态作为内存中的对象进行管理，将本地状态存储在任务管理器的JVM堆上，而将检查点文件存储在作业管理器的内存中。</span></li><li><span>特点：快速、低延迟，但不稳定</span></li></ul></li><li><p><span>FsStateBackend（最常用）</span></p><ul><li><span>将状态存到远程的持久化文件系统（FileSystem）上面的检查点文件里面，而对于本地状态，跟MemoryStateBackend一样，也会存在任务管理器的JVM堆上。</span></li><li><span>同时拥有内存级的本地访问速度，和更好的容错保证</span></li></ul></li><li><p><span>RocksDBStateBackend</span></p><ul><li><span>在任务管理器的JVM堆中，维护本地状态。将所有状态序列化后，存入RocksDB中。</span></li><li><span>RocksDB是一个硬盘KV数据库。</span></li></ul></li></ul><h1 id='flink的容错机制'><span>Flink的容错机制</span></h1><h2 id='flink程序如何从检查点恢复程序'><span>Flink程序如何从检查点恢复程序</span></h2><figure><img src="figure/35.svg" alt="快照" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-37 快照</figcaption></figure><ul><li><span>Flink故障恢复机制的核心，就是应用状态的一致性检查点。</span></li><li><span>有状态流应用的一致检查点，其实就是所有并行子任务的状态，在某个时间点的一份拷贝（一份快照）。</span></li><li><span>上游是一个可重置读取位置的持久化设备（例如Kafka）。</span></li></ul><figure><img src="figure/36.svg" alt="" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-38 </figcaption></figure><figure><img src="figure/37.svg" alt="第一步：重启" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-39 第一步：重启</figcaption></figure><figure><img src="figure/38.svg" alt="第二步：从检查点文件恢复状态" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-40 第二步：从检查点文件恢复状态</figcaption></figure><figure><img src="figure/39.svg" alt="第三步：继续处理数据" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-41 第三步：继续处理数据</figcaption></figure><p><span>无论重启多少次程序，某个数值只会在累加器中被累加一次，这个就叫做</span><span style="color:red"><span>精准一次消费（Exactly-Once）</span></span><span>——既不会丢失数据，也不会重复累加某个数据。</span></p><h2 id='flink如何保存检查点'><span>Flink如何保存检查点</span></h2><p><strong><span>保存检查点的算法</span></strong></p><ul><li><p><span>同步的思想</span></p><ul><li><span>暂停应用，保存状态到检查点，再重新恢复应用（Spark Streaming）</span></li></ul></li><li><p><span>异步的思想（Flink的改进实现）</span></p><ul><li><span>基于Chandy-Lamport算法的异步分布式快照算法</span></li><li><span>将检查点的保存和数据处理分离开，不暂停整个应用</span></li></ul></li></ul><p><strong><span>算法的核心机制：检查点分界线</span></strong></p><blockquote><p><span>检查点分界线：checkpoint barrier</span></p></blockquote><ul><li><span>检查点分界线是一种</span><span style="color:red"><span>特殊的事件</span></span><span>。用来把一条流上的数据按照不同的检查点分开。</span></li><li><span>检查点分界线之前到来的数据导致的状态更改，都会被包含在当前检查点分界线所属的检查点中；而基于检查点分界线之后的数据导致的所有更改，就会被包含在之后的检查点中。</span></li><li><span>检查点分界线到达哪一个并行子任务，就对哪一个并行子任务的状态做快照。</span></li></ul><blockquote><p>📝</p><p><span>Flink中有两种特殊的事件：</span></p><ul><li><span>水位线</span></li><li><span>检查点分界线</span></li></ul></blockquote><figure><img src="figure/40.svg" alt="示例程序" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-42 示例程序</figcaption></figure><figure><img src="figure/41.svg" alt="启动检查点" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-43 启动检查点</figcaption></figure><p><span>作业管理器会向每个Source的并行子任务发送一条带有新检查点ID的消息，通过这种方式来启动检查点的保存工作。</span></p><figure><img src="figure/42.svg" alt="向下游发送检查点分界线" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-44 向下游发送检查点分界线</figcaption></figure><p><span>在分流时，检查点分界线是广播出去的。和水位线在分流时的传递机制相同。</span></p><figure><img src="figure/43.svg" alt="分界线对齐" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-45 分界线对齐</figcaption></figure><ul><li><span>分界线对齐：检查点分界线向下游传递，sum任务会等待所有输入分区的检查点分界线到达</span></li><li><span>对于检查点分界线已经到达的分区，继续到达的数据会被缓存</span></li><li><span>而检查点分界线尚未到达的分区，数据会被正常处理</span></li></ul><p><span>sum_even什么时候做快照？必须接收到来自source1和source2的所有检查点分界线时，才会做快照。这个机制叫做</span><span style="color:red"><span>检查点分界线对齐</span></span><span>。检查点分界线在合流时的传递机制和水位线不一样。</span></p><figure><img src="figure/44.svg" alt="继续向下游发送检查点分界线" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-46 继续向下游发送检查点分界线</figcaption></figure><p><span>Source并行子任务将它们的状态写入检查点文件，并发出一个检查点分界线。</span></p><p><span>状态后端在状态存入检查点文件之后，会返回通知给Source的并行子任务，Source的并行子任务就会向作业管理器确认Source的并行子任务的检查点保存完成。</span></p><figure><img src="figure/45.svg" alt="任务正常处理数据" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-47 任务正常处理数据</figcaption></figure><figure><img src="figure/46.svg" alt="检查点保存完成" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-48 检查点保存完成</figcaption></figure><h1 id='端到端一致性'><span>端到端一致性</span></h1><h2 id='状态一致性分类'><span>状态一致性分类</span></h2><ul><li><p><span>AT-MOST-ONCE（最多一次）</span></p><ul><li><span>当任务故障时，最简单的做法是什么都不干，既不恢复丢失的状态，也不重播丢失的数据。at-most-once语义的含义是最多处理一次事件。例如：UDP，不提供任何一致性保障</span></li></ul></li><li><p><span>AT-LEAST-ONCE（至少一次）</span></p><ul><li><span>在大多数的真实应用场景，我们希望不丢失事件。这种类型的保障称为at-least-once，意思是所有的事件都得到了处理，而一些事件还可能被处理多次。</span></li></ul></li><li><p><span>EXACTLY-ONCE（精确一次）</span></p><ul><li><span>恰好处理一次是最严格的保证，也是最难实现的。恰好处理一次语义不仅仅意味着没有事件丢失，还意味着针对每一个数据，内部状态仅仅更新一次。</span></li></ul></li></ul><h2 id='端到端状态一致性'><span>端到端状态一致性</span></h2><ul><li><span>目前我们看到的一致性保证都是由流处理器实现的，也就是说都是在Flink流处理器内部保证的；而在真实应用中，流处理应用除了流处理器以外还包含了数据源（例如Kafka）和输出到持久化系统</span></li><li><span>端到端的一致性保证，意味着结果的正确性贯穿了整个流处理应用的始终；每一个组件都保证了它自己的一致性</span></li><li><span>整个端到端的一致性级别取决于所有组件中一致性最弱的组件</span></li></ul><h2 id='端到端exactly-once一致性保障'><span>端到端EXACTLY-ONCE一致性保障</span></h2><ul><li><p><span>内部保证—checkpoint（分布式异步快照算法）</span></p></li><li><p><span>Source端—可重设数据的读取位置（Kafka，FileSystem）</span></p></li><li><p><span>Sink端—从故障恢复时，数据不会重复写入外部系统</span></p><ul><li><span>幂等写入</span></li><li><span>事务写入</span></li></ul></li></ul><h2 id='幂等写入'><span>幂等写入</span></h2><p><span>所谓幂等操作，是说一个操作，可以重复执行很多次，但只导致一次结果更改，也就是说，后面再重复执行就不起作用了。</span></p><h2 id='事务写入'><span>事务写入</span></h2><ul><li><p><span>事务（Transaction）  </span></p><ul><li><span>应用程序中一系列严密的操作，所有操作必须成功完成，否则在每个操作中所作的所有更改都会被撤消（ACID）</span></li><li><span>具有原子性：一个事务中的一系列的操作要么全部成功，要么一个都不做</span></li></ul></li><li><p><span>实现思想：构建的事务对应着检查点，等到检查点保存真正完成的时候，才把所有要输出的结果写入Sink系统中。</span></p></li><li><p><span>实现方式</span></p><ul><li><span>预写日志（WAL，Write Ahead Log）（只能保证at-least-once）</span></li><li><span>两阶段提交（Two Phase Commit，2PC）（可以保证exactly-once）</span></li></ul></li></ul><h2 id='预写式日志'><span>预写式日志</span></h2><ul><li><span>把结果数据（也就是要输出的数据）先缓存到状态后端，然后在收到检查点完成的通知时，一次性写入Sink系统（状态后端 </span>➡️<span> 下游设备）（万一写到中间的时候挂掉了呢？WAL只能保障at-least-once）</span></li><li><span>简单易于实现，由于数据提前在状态后端中做了存储，所以无论什么样的下游设备，都能用这种方式一批搞定</span></li><li><span>DataStream API提供了一个模板类：GenericWriteAheadSink，来实现这种预写式日志的Sink。</span></li></ul><h2 id='两阶段提交'><span>两阶段提交</span></h2><ul><li><p><span>对于每个检查点，Sink任务会启动一个事务（下游设备的事务，比如MySQL，Kakfa），并将接下来所有接收的数据添加到事务里</span></p></li><li><p><span>然后将这些要输出的数据写入外部Sink系统的事务中，但不正式提交它们——这时只是“预提交”</span></p></li><li><p><span>当Sink算子接收到检查点完成（所有的并行子任务都完成了快照的保存操作）的通知时，它才正式提交下游设备的事务，实现结果的真正写入</span></p><ul><li><span>这种方式真正实现了Exactly-Once，它需要一个提供事务支持的外部Sink系统。Flink提供了TwoPhaseCommitSinkFunction接口。</span></li><li><span>有可能在一段时间内看不到Sink的结果</span></li></ul></li></ul><h2 id='总结'><span>总结</span></h2><figure><table><thead><tr><th>&nbsp;</th><th><span>不可重置的源</span></th><th><span>可重置的源</span></th></tr></thead><tbody><tr><td><span>any sink</span></td><td><span>at-most-once</span></td><td><span>at-least-once</span></td></tr><tr><td><span style="color:red"><span>幂等性sink</span></span></td><td><span>at-most-once</span></td><td><span>exactly-once</span></td></tr><tr><td><span>预写式日志sink</span></td><td><span>at-most-once</span></td><td><span>at-least-once</span></td></tr><tr><td><span style="color:red"><span>两阶段提交</span></span></td><td><span>at-most-once</span></td><td><span>exactly-once</span></td></tr></tbody></table></figure><h2 id='kafka--flink--kafka端到端一致性'><span>Kafka </span>➡️<span> Flink </span>➡️<span> Kafka端到端一致性</span></h2><ul><li><span>内部—利用检查点机制，把状态存盘（HDFS），发生故障的时候可以恢复，保证内部的状态一致性。</span></li><li><span>source—FlinkKafkaConsumer作为source，可以将偏移量保存下来，如果后续任务出现了故障，恢复的时候可以由连接器重置偏移量，重新消费数据，保证一致性。</span></li><li><span>sink—FlinkKafkaProducer作为sink。</span></li></ul><h3 id='两阶段提交步骤'><span>两阶段提交步骤</span></h3><ol start='' ><li><span>第一条数据来了之后，开启一个Kafka的事务（transaction），正常写入Kafka分区日志但标记为未提交，这就是“预提交”</span></li><li><span>JobManager触发checkpoint操作，barrier从source开始向下传递，遇到barrier的算子将状态存入状态后端，并通知JobManager</span></li><li><span>Sink连接器收到barrier，保存当前状态，存入checkpoint，通知JobManager，并开启下一阶段的事务，用于提交下个检查点的数据</span></li><li><span>JobManager收到所有任务的通知，发出确认信息，表示checkpoint完成</span></li><li><span>Sink任务收到JobManager的确认信息，正式提交这段时间的数据</span></li><li><span>外部Kafka关闭事务，提交的数据可以正常消费了</span></li></ol><h1 id='背压问题'><span>背压问题</span></h1><p><span>背压问题的本质：单位时间缓冲区入队列的数据量，大于缓冲区出队列的数据量。</span></p><ul><li><span>减慢入队列（生产者）的速度</span></li><li><span>加快出队列（消费者）的速度</span></li><li><span>增大缓冲区队列的大小</span></li><li><span>消费者告诉生产者自己目前能消费多少（信用度）</span></li></ul><p><span>下游算子会向上有算子发送一个消息，告诉上游算子自己的接收缓冲区能接收多少数据，当上游算子的缓冲区攒够 </span><mjx-container class="MathJax" jax="SVG" style="position: relative;"><svg xmlns="http://www.w3.org/2000/svg" width="1.76ex" height="1.751ex" role="img" focusable="false" viewBox="0 -636 778 774" xmlns:xlink="http://www.w3.org/1999/xlink" aria-hidden="true" style="vertical-align: -0.312ex;"><defs><path id="MJX-28-TEX-N-2264" d="M674 636Q682 636 688 630T694 615T687 601Q686 600 417 472L151 346L399 228Q687 92 691 87Q694 81 694 76Q694 58 676 56H670L382 192Q92 329 90 331Q83 336 83 348Q84 359 96 365Q104 369 382 500T665 634Q669 636 674 636ZM84 -118Q84 -108 99 -98H678Q694 -104 694 -118Q694 -130 679 -138H98Q84 -131 84 -118Z"></path></defs><g stroke="currentColor" fill="currentColor" stroke-width="0" transform="scale(1,-1)"><g data-mml-node="math"><g data-mml-node="mo"><use data-c="2264" xlink:href="#MJX-28-TEX-N-2264"></use></g></g></g></svg><mjx-assistive-mml unselectable="on" display="inline"><math xmlns="http://www.w3.org/1998/Math/MathML"><mo>≤</mo></math></mjx-assistive-mml></mjx-container><script type="math/tex">\le</script><span> 下游算子的接收缓冲区能接收的数据量，就发送下去。</span></p><p><span>例如：</span></p><p><span>下游算子的接收缓冲区的大小是4，但目前只有3个元素，也就是说还能接收1个元素，那么会发送一条消息给上游算子，告诉上游算子自己的缓冲区还有1个空位，这样当上游算子的发送缓冲区里有1条数据，直接发送给下游算子的接收缓冲区。</span></p><p><strong><span>信用度算法</span></strong></p><p><span>信用度：下游算子的接收缓冲区还能接收多少条数据。</span></p><p><strong><span>基于信用度的流量控制</span></strong></p><p><span>通过网络连接来发送每条数据的效率很低，会导致很大的开销。为了充分利用网络连接的带宽，就需要进行缓冲了。在流处理的上下文中，缓冲的一个缺点是会增加延迟，因为数据需要在缓冲区中进行收集，而不是立即发送。</span></p><p><span>Flink实现了一个基于信用度的流量控制机制，其工作原理如下。接收任务授予发送任务一些“信用度”（credit），也就是为了接收其数据而保留的网络缓冲区数。当发送者收到一个信用度通知，它就会按照被授予的信用度，发送尽可能多的缓冲数据，并且同时发送目前积压数据的大小——也就是已填满并准备发送的网络缓冲的数量。接收者用保留的缓冲区处理发来的数据，并对发送者传来的积压量进行综合考量，为其所有连接的发送者确定下一个信用度授权的优先级。</span></p><p><span>基于信用度的流控制可以减少延迟，因为发送者可以在接收者有足够的资源接受数据时立即发送数据。此外，在数据倾斜的情况下，这样分配网络资源是一种很有效的机制，因为信用度是根据发送者积压数据量的规模授予的。因此，基于信用的流量控制是Flink实现高吞吐量和低延迟的重要组成部分。</span></p><h1 id='有限状态机'><span>有限状态机</span></h1><figure><img src="figure/47.svg" alt="判断字符串中的0是否为偶数个" style="width:100%;display:block;margin-left:auto;margin-right:auto;"><figcaption style="text-align:center;color:brown;">图-49 判断字符串中的0是否为偶数个</figcaption></figure><p><a href='https://nightlies.apache.org/flink/flink-docs-master/zh/docs/libs/cep/'><span>flink cep官网链接</span></a></p></div></div>
</body>
</html>